From 0f8dfcf6e177ce9c1baa5c9a70a3aeec04f33e90 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Tue, 14 May 2024 10:03:03 +0100 Subject: [PATCH 1/6] simplify keep_alive --- spinnman/spalloc/spalloc_client.py | 109 +++++++++-------------------- spinnman/spalloc/spalloc_job.py | 23 ------ 2 files changed, 34 insertions(+), 98 deletions(-) diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index 8ed750490..3503987cb 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -14,16 +14,15 @@ """ Implementation of the client for the Spalloc web service. """ - -from contextlib import contextmanager +import time from logging import getLogger -from multiprocessing import Process, Queue + import queue import struct import threading from time import sleep -from typing import (Any, ContextManager, Callable, Dict, FrozenSet, Iterable, - Iterator, List, Mapping, Optional, Tuple, cast) +from typing import (Any, Callable, Dict, FrozenSet, Iterable, List, Mapping, + Optional, Tuple, cast) from urllib.parse import urlparse, urlunparse, ParseResult from packaging.version import Version @@ -69,6 +68,7 @@ _msg = struct.Struct(" str: """ @@ -199,7 +199,7 @@ def _create(self, create: Mapping[str, JsonValue], def create_job( self, num_boards: int = 1, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "num-boards": int(num_boards), "keepalive-interval": f"PT{int(keepalive)}S" @@ -209,7 +209,7 @@ def create_job( def create_job_rect( self, width: int, height: int, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "dimensions": { "width": int(width), @@ -224,7 +224,7 @@ def create_job_board( physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: board: JsonObject if triad: x, y, z = triad @@ -248,7 +248,7 @@ def create_job_rect_at_board( triad: Optional[Tuple[int, int, int]] = None, physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, - machine_name: Optional[str] = None, keepalive: int = 45, + machine_name: Optional[str] = None, keepalive = KEEP_ALIVE_PERIOND, max_dead_boards: int = 0) -> SpallocJob: board: JsonObject if triad: @@ -285,27 +285,6 @@ class _ProxyServiceError(IOError): """ -def _spalloc_keepalive(url, interval, term_queue, cookies, headers): - """ - Actual keepalive task implementation. Don't use directly. - """ - headers["Content-Type"] = "text/plain; charset=UTF-8" - while True: - requests.put(url, data="alive", cookies=cookies, headers=headers, - allow_redirects=False, timeout=10) - try: - term_queue.get(True, interval) - break - except queue.Empty: - continue - # On ValueError or OSError, just terminate the keepalive process - # They happen when the term_queue is directly closed - except ValueError: - break - except OSError: - break - - class _SpallocMachine(SessionAware, SpallocMachine): """ Represents a Spalloc-controlled machine. @@ -507,7 +486,7 @@ class _SpallocJob(SessionAware, SpallocJob): Don't make this yourself. Use :py:class:`SpallocClient` instead. """ __slots__ = ("__machine_url", "__chip_url", - "_keepalive_url", "__keepalive_handle", "__proxy_handle", + "_keepalive_url", "__proxy_handle", "__proxy_thread", "__proxy_ping") def __init__(self, session: Session, job_handle: str): @@ -520,10 +499,11 @@ def __init__(self, session: Session, job_handle: str): self.__machine_url = self._url + "machine" self.__chip_url = self._url + "chip" self._keepalive_url = self._url + "keepalive" - self.__keepalive_handle: Optional[Queue] = None self.__proxy_handle: Optional[WebSocket] = None self.__proxy_thread: Optional[_ProxyReceiver] = None self.__proxy_ping: Optional[_ProxyPing] = None + keep_alive = threading.Thread(target=self.__start_keepalive, daemon=True) + keep_alive.start() @overrides(SpallocJob.get_session_credentials_for_db) def get_session_credentials_for_db(self) -> Mapping[Tuple[str, str], str]: @@ -651,9 +631,7 @@ def wait_until_ready(self, timeout: Optional[int] = None, @overrides(SpallocJob.destroy) def destroy(self, reason: str = "finished"): - if self.__keepalive_handle: - self.__keepalive_handle.close() - self.__keepalive_handle = None + self._keepalive_url = None if self.__proxy_handle is not None: if self.__proxy_thread: self.__proxy_thread.close() @@ -663,38 +641,29 @@ def destroy(self, reason: str = "finished"): self._delete(self._url, reason=str(reason)) logger.info("deleted job at {}", self._url) - @overrides(SpallocJob.keepalive) - def keepalive(self) -> None: - self._put(self._keepalive_url, "alive") - - @overrides(SpallocJob.launch_keepalive_task, extend_doc=True) - def launch_keepalive_task( - self, period: float = 30) -> ContextManager[Process]: + def __keepalive(self) -> bool: """ - .. note:: - Tricky! *Cannot* be done with a thread, as the main thread is known - to do significant amounts of CPU-intensive work. - """ - if self.__keepalive_handle is not None: - raise SpallocException("cannot keep job alive from two tasks") - q: Queue = Queue(1) - p = Process(target=_spalloc_keepalive, args=( - self._keepalive_url, 0 + period, q, - *self._session_credentials), daemon=True) - p.start() - self.__keepalive_handle = q - return self.__closer(q, p) - - @contextmanager - def __closer(self, q: Queue, p: Process) -> Iterator[Process]: + Signal the that we want it to stay alive for a while longer. + + :return: True if the job has not been destroyed + :rtype: bool + """ + if self._keepalive_url is None: + print("False") + return False + cookies, headers = self._session_credentials + headers["Content-Type"] = "text/plain; charset=UTF-8" + logger.debug(self._keepalive_url) + requests.put(self._keepalive_url, data="alive", cookies=cookies, + headers=headers, allow_redirects=False, timeout=10) + return True + + def __start_keepalive(self) -> None: try: - yield p - finally: - q.put("quit") - # Give it a second, and if it still isn't dead, kill it - p.join(1) - if p.is_alive(): - p.kill() + while self.__keepalive(): + time.sleep(KEEP_ALIVE_PERIOND / 2) + except Exception as ex: # pylint: disable=broad-except + logger.exception(ex) @overrides(SpallocJob.where_is_machine) def where_is_machine(self, x: int, y: int) -> Optional[ @@ -705,16 +674,6 @@ def where_is_machine(self, x: int, y: int) -> Optional[ return cast(Tuple[int, int, int], tuple( r.json()["physical-board-coordinates"])) - @property - def _keepalive_handle(self) -> Optional[Queue]: - return self.__keepalive_handle - - @_keepalive_handle.setter - def _keepalive_handle(self, handle: Queue): - if self.__keepalive_handle is not None: - raise SpallocException("cannot keep job alive from two tasks") - self.__keepalive_handle = handle - @overrides(SpallocJob.create_transceiver) def create_transceiver(self) -> Transceiver: if self.get_state() != SpallocState.READY: diff --git a/spinnman/spalloc/spalloc_job.py b/spinnman/spalloc/spalloc_job.py index 14f5ba429..d7b9a907d 100644 --- a/spinnman/spalloc/spalloc_job.py +++ b/spinnman/spalloc/spalloc_job.py @@ -182,29 +182,6 @@ def destroy(self, reason: str = "finished"): """ raise NotImplementedError() - @abstractmethod - def keepalive(self) -> None: - """ - Signal the job that we want it to stay alive for a while longer. - """ - raise NotImplementedError() - - @abstractmethod - def launch_keepalive_task( - self, period: int = 30) -> AbstractContextManager: - """ - Starts a periodic task to keep a job alive. - - :param SpallocJob job: - The job to keep alive - :param int period: - How often to send a keepalive message (in seconds) - :return: - Some kind of closeable task handle; closing it terminates the task. - Destroying the job will also terminate the task. - """ - raise NotImplementedError() - @abstractmethod def where_is_machine(self, x: int, y: int) -> Optional[ Tuple[int, int, int]]: From b89c3f444146e21dc4b318a84942a9337ed5edac Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Tue, 14 May 2024 10:26:08 +0100 Subject: [PATCH 2/6] add typing more typing flake8 typing fix copyright --- manual_scripts/get_triad.py | 58 ++++++++++++++++++++++++++++++ spinnman/spalloc/spalloc_client.py | 16 +++++---- spinnman/spalloc/spalloc_job.py | 1 - 3 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 manual_scripts/get_triad.py diff --git a/manual_scripts/get_triad.py b/manual_scripts/get_triad.py new file mode 100644 index 000000000..27d1d6f8d --- /dev/null +++ b/manual_scripts/get_triad.py @@ -0,0 +1,58 @@ +# Copyright (c) 2014 The University of Manchester +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from spinn_utilities.config_holder import set_config +from spinnman.spalloc import SpallocClient +from spinnman.config_setup import unittest_setup + + +SPALLOC_URL = "https://spinnaker.cs.man.ac.uk/spalloc" +SPALLOC_USERNAME = "" +SPALLOC_PASSWORD = "" + +SPALLOC_MACHINE = "SpiNNaker1M" + +x = 0 +y = 3 +b = 0 # Must be 0 if requesting a rect +RECT = True +WIDTH = 1 # In triads! +HEIGHT = 1 # In triads! + +unittest_setup() +set_config("Machine", "version",5) +client = SpallocClient(SPALLOC_URL, SPALLOC_USERNAME, SPALLOC_PASSWORD) +if RECT: + job = client.create_job_rect_at_board( + WIDTH, HEIGHT, triad=(x, y, b), machine_name=SPALLOC_MACHINE, + max_dead_boards=1) +else: + job = client.create_job_board( + triad=(x, y, b), machine_name=SPALLOC_MACHINE) +print(job) +print("Waiting until ready...") +with job: + job.wait_until_ready() + print(job.get_connections()) + + txrx = job.create_transceiver() + # This call is for testing and can be changed without notice! + dims = txrx._get_machine_dimensions() + print(f"{dims.height=}, {dims.width=}") + + machine = txrx.get_machine_details() + print(machine) + + input("Press Enter to release...") +client.close()#print(2)#print(2^(1/(2^1))) \ No newline at end of file diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index 3503987cb..7e588004b 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -70,6 +70,7 @@ KEEP_ALIVE_PERIOND = 30 + def fix_url(url: Any) -> str: """ Makes sure the url is the correct format. @@ -199,7 +200,7 @@ def _create(self, create: Mapping[str, JsonValue], def create_job( self, num_boards: int = 1, machine_name: Optional[str] = None, - keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: + keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "num-boards": int(num_boards), "keepalive-interval": f"PT{int(keepalive)}S" @@ -209,7 +210,7 @@ def create_job( def create_job_rect( self, width: int, height: int, machine_name: Optional[str] = None, - keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: + keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "dimensions": { "width": int(width), @@ -224,7 +225,7 @@ def create_job_board( physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, machine_name: Optional[str] = None, - keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: + keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob: board: JsonObject if triad: x, y, z = triad @@ -248,7 +249,8 @@ def create_job_rect_at_board( triad: Optional[Tuple[int, int, int]] = None, physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, - machine_name: Optional[str] = None, keepalive = KEEP_ALIVE_PERIOND, + machine_name: Optional[str] = None, + keepalive: int = KEEP_ALIVE_PERIOND, max_dead_boards: int = 0) -> SpallocJob: board: JsonObject if triad: @@ -498,11 +500,12 @@ def __init__(self, session: Session, job_handle: str): logger.info("established job at {}", job_handle) self.__machine_url = self._url + "machine" self.__chip_url = self._url + "chip" - self._keepalive_url = self._url + "keepalive" + self._keepalive_url: Optional[str] = self._url + "keepalive" self.__proxy_handle: Optional[WebSocket] = None self.__proxy_thread: Optional[_ProxyReceiver] = None self.__proxy_ping: Optional[_ProxyPing] = None - keep_alive = threading.Thread(target=self.__start_keepalive, daemon=True) + keep_alive = threading.Thread( + target=self.__start_keepalive, daemon=True) keep_alive.start() @overrides(SpallocJob.get_session_credentials_for_db) @@ -649,7 +652,6 @@ def __keepalive(self) -> bool: :rtype: bool """ if self._keepalive_url is None: - print("False") return False cookies, headers = self._session_credentials headers["Content-Type"] = "text/plain; charset=UTF-8" diff --git a/spinnman/spalloc/spalloc_job.py b/spinnman/spalloc/spalloc_job.py index d7b9a907d..04ffa8715 100644 --- a/spinnman/spalloc/spalloc_job.py +++ b/spinnman/spalloc/spalloc_job.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from contextlib import AbstractContextManager from typing import Dict, Mapping, Optional, Tuple from spinn_utilities.abstract_base import AbstractBase, abstractmethod from spinnman.constants import SCP_SCAMP_PORT From f856594da16226afe750c4c5f6113a217ff116bd Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 15 May 2024 12:02:29 +0100 Subject: [PATCH 3/6] increase keep alive period to 2 minutes --- spinnman/spalloc/spalloc_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index 7e588004b..a26d95da3 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -68,7 +68,7 @@ _msg = struct.Struct(" str: From 614bc2a9383770623f1de744c274aca4bca6b198 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 15 May 2024 12:04:55 +0100 Subject: [PATCH 4/6] copy right date --- manual_scripts/get_triad.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manual_scripts/get_triad.py b/manual_scripts/get_triad.py index 27d1d6f8d..370fe47bd 100644 --- a/manual_scripts/get_triad.py +++ b/manual_scripts/get_triad.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 The University of Manchester +# Copyright (c) 2024 The University of Manchester # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From f1e021699a190c247d123ce5b04a0c01cb2797c7 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 15 May 2024 12:08:13 +0100 Subject: [PATCH 5/6] comment fixes --- spinnman/spalloc/spalloc_client.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index a26d95da3..a82f9bcf6 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -646,11 +646,11 @@ def destroy(self, reason: str = "finished"): def __keepalive(self) -> bool: """ - Signal the that we want it to stay alive for a while longer. + Signal spalloc that we want the job to stay alive for a while longer. - :return: True if the job has not been destroyed - :rtype: bool - """ + :return: False if the job has not been destroyed + :rtype: bool + """ if self._keepalive_url is None: return False cookies, headers = self._session_credentials @@ -661,6 +661,10 @@ def __keepalive(self) -> bool: return True def __start_keepalive(self) -> None: + """ + Method for keep alive thread to start the keep alive class + + """ try: while self.__keepalive(): time.sleep(KEEP_ALIVE_PERIOND / 2) From 6a20a4f3b9b9eb4dc6b7007b10b12dd0c7391b3c Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 15 May 2024 12:24:06 +0100 Subject: [PATCH 6/6] flake8 --- spinnman/spalloc/spalloc_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index a82f9bcf6..d2a58ace4 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -646,10 +646,10 @@ def destroy(self, reason: str = "finished"): def __keepalive(self) -> bool: """ - Signal spalloc that we want the job to stay alive for a while longer. + Signal spalloc that we want the job to stay alive for a while longer. - :return: False if the job has not been destroyed - :rtype: bool + :return: False if the job has not been destroyed + :rtype: bool """ if self._keepalive_url is None: return False