From 88f1532235bca78a52dae290fb82c614c33eabce Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Mon, 17 Jun 2024 15:48:35 +0200 Subject: [PATCH 1/2] 54: Move mesido to subprocess multiprocessing.Pool which is terminated upon a cancel and encapsulate SystemExit exceptions which are thrown by Mesido into an EarlySystemExit. --- pyproject.toml | 4 +- src/grow_worker/worker.py | 95 ++++++++++++++++++++++++++++++--------- 2 files changed, 76 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 11f50c7..7dbff5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ requires-python = ">=3.10" dependencies = [ "python-dotenv ~= 1.0.0", "mesido ~= 0.1.3.1", - "omotes-sdk-python ~= 0.0.15" + "omotes-sdk-python ~= 0.0.16" ] [project.optional-dependencies] @@ -66,7 +66,7 @@ requires = [ enabled = true [tool.pytest.ini_options] -addopts = "--cov=grow_worker --cov-report html --cov-report term-missing --cov-fail-under 25" +addopts = "--cov=grow_worker --cov-report html --cov-report term-missing --cov-fail-under 15" [tool.coverage.run] source = ["src"] diff --git a/src/grow_worker/worker.py b/src/grow_worker/worker.py index 03d814a..0f2925b 100644 --- a/src/grow_worker/worker.py +++ b/src/grow_worker/worker.py @@ -1,6 +1,9 @@ import base64 import logging import multiprocessing +import multiprocessing.pool +import signal +import time from multiprocessing.process import current_process import os from pathlib import Path @@ -23,6 +26,16 @@ GROW_TASK_TYPE = GrowTaskType(os.environ.get("GROW_TASK_TYPE")) +class EarlySystemExit(Exception): + """Wrapper for `SystemExit` exception. + + To ensure that the worker process does not shutdown but rather handles the `SystemExit` as an + error + """ + + ... + + def run_mesido(input_esdl: str) -> str: """Run mesido using the specific workflow. @@ -40,30 +53,65 @@ def run_mesido(input_esdl: str) -> str: influxdb_host = os.environ.get("INFLUXDB_HOSTNAME", "localhost") influxdb_port = int(os.environ.get("INFLUXDB_PORT", "8086")) - logger.info("Will write result profiles to influx: {}. At {}:{}", - write_result_db_profiles, - influxdb_host, - influxdb_port) - - solution: GROWProblem = mesido_func( - mesido_workflow, - base_folder=base_folder, - esdl_string=base64.encodebytes(input_esdl.encode("utf-8")), - esdl_parser=ESDLStringParser, - write_result_db_profiles=write_result_db_profiles, - influxdb_host=influxdb_host, - influxdb_port=influxdb_port, - influxdb_username=os.environ.get("INFLUXDB_USERNAME"), - influxdb_password=os.environ.get("INFLUXDB_PASSWORD"), - influxdb_ssl=False, - influxdb_verify_ssl=False, - update_progress_function=None, - profile_reader=InfluxDBProfileReader, + logger.info( + "Will write result profiles to influx: {}. At {}:{}", + write_result_db_profiles, + influxdb_host, + influxdb_port, ) + try: + solution: GROWProblem = mesido_func( + mesido_workflow, + base_folder=base_folder, + esdl_string=base64.encodebytes(input_esdl.encode("utf-8")), + esdl_parser=ESDLStringParser, + write_result_db_profiles=write_result_db_profiles, + influxdb_host=influxdb_host, + influxdb_port=influxdb_port, + influxdb_username=os.environ.get("INFLUXDB_USERNAME"), + influxdb_password=os.environ.get("INFLUXDB_PASSWORD"), + influxdb_ssl=False, + influxdb_verify_ssl=False, + update_progress_function=None, + profile_reader=InfluxDBProfileReader, + ) + except SystemExit as e: + raise EarlySystemExit(e) return cast(str, solution.optimized_esdl_string) +def kill_pool(pool: multiprocessing.pool.Pool) -> None: + """Terminate all the process of a multiprocessing.Pool with SIGKILL. + + Found here: https://stackoverflow.com/a/47580796 + + multiprocessing.Pool.terminate does not provide a way to give a different signal than SIGTERM + so this function hooks into the internals to properly handle sending SIGKILL to all processes in + the pool. + + :param pool: The multiprocessing to kill all processes in. + """ + # + # stop repopulating new child + pool._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined] + pool._worker_handler._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined] + for p in pool._pool: # type: ignore[attr-defined] + if p.is_alive(): + logger.warning("Sending SIGKILL to pool process with pid %s", p.pid) + os.kill(p.pid, signal.SIGKILL) + # .is_alive() will reap dead process + wait_till = time.time() + 5.0 + while ( + any(p.is_alive() for p in pool._pool) # type: ignore[attr-defined] + and time.time() < wait_till + ): + pass + logger.warning("All processes in pool have been terminated.") + pool.terminate() + logger.warning("Forceful pool termination completed.") + + def grow_worker_task( input_esdl: str, params_dict: ParamsDict, update_progress_handler: UpdateProgressHandler ) -> str: @@ -85,10 +133,15 @@ def grow_worker_task( # subprocesses from being created. This does introduce the issue that if this # process is killed/cancelled/revoked, the subprocess will continue as a zombie process. # See https://github.com/Project-OMOTES/optimizer-worker/issues/54 - current_process()._config['daemon'] = False # type: ignore[attr-defined] + current_process()._config["daemon"] = False # type: ignore[attr-defined] with multiprocessing.Pool(1) as pool: - output_esdl = pool.map(run_mesido, [input_esdl])[0] + try: + output_esdl = pool.map(run_mesido, [input_esdl])[0] + except SystemExit as e: + logger.warning("During pool the worker was requested to quit: %s %s", type(e), e) + kill_pool(pool) + raise return output_esdl From 000e94c764929348beaba9c8926a7bea1e76a4b7 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Mon, 17 Jun 2024 15:51:42 +0200 Subject: [PATCH 2/2] 54: Update dependencies and sdk. --- dev-requirements.txt | 6 ++++-- requirements.txt | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 585e0f2..abf5b62 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -80,7 +80,9 @@ coolprop==6.6.0 # -c requirements.txt # mesido coverage[toml]==7.4.1 - # via pytest-cov + # via + # coverage + # pytest-cov exceptiongroup==1.2.0 # via pytest flake8==6.1.0 @@ -158,7 +160,7 @@ omotes-sdk-protocol==0.0.8 # via # -c requirements.txt # omotes-sdk-python -omotes-sdk-python==0.0.15 +omotes-sdk-python==0.0.16 # via # -c requirements.txt # omotes-grow-worker (pyproject.toml) diff --git a/requirements.txt b/requirements.txt index 29c51f5..50842a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -71,7 +71,7 @@ numpy==1.22.4 # scipy omotes-sdk-protocol==0.0.8 # via omotes-sdk-python -omotes-sdk-python==0.0.15 +omotes-sdk-python==0.0.16 # via omotes-grow-worker (pyproject.toml) ordered-set==4.1.0 # via pyecore