Skip to content

Commit

Permalink
Merge pull request #58 from Project-OMOTES/54-figure-out-a-better-alt…
Browse files Browse the repository at this point in the history
…ernative-for-the-celery-subprocess-in-subprocess-problem

54 figure out a better alternative for the celery subprocess in subprocess problem
  • Loading branch information
lfse-slafleur authored Jun 17, 2024
2 parents cae5f27 + 000e94c commit 905d491
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 26 deletions.
6 changes: 4 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ coolprop==6.6.0
# -c requirements.txt
# mesido
coverage[toml]==7.4.1
# via pytest-cov
# via
# coverage
# pytest-cov
exceptiongroup==1.2.0
# via pytest
flake8==6.1.0
Expand Down Expand Up @@ -158,7 +160,7 @@ omotes-sdk-protocol==0.0.8
# via
# -c requirements.txt
# omotes-sdk-python
omotes-sdk-python==0.0.15
omotes-sdk-python==0.0.16
# via
# -c requirements.txt
# omotes-grow-worker (pyproject.toml)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ requires-python = ">=3.10"
dependencies = [
"python-dotenv ~= 1.0.0",
"mesido ~= 0.1.3.1",
"omotes-sdk-python ~= 0.0.15"
"omotes-sdk-python ~= 0.0.16"
]

[project.optional-dependencies]
Expand Down Expand Up @@ -66,7 +66,7 @@ requires = [
enabled = true

[tool.pytest.ini_options]
addopts = "--cov=grow_worker --cov-report html --cov-report term-missing --cov-fail-under 25"
addopts = "--cov=grow_worker --cov-report html --cov-report term-missing --cov-fail-under 15"

[tool.coverage.run]
source = ["src"]
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ numpy==1.22.4
# scipy
omotes-sdk-protocol==0.0.8
# via omotes-sdk-python
omotes-sdk-python==0.0.15
omotes-sdk-python==0.0.16
# via omotes-grow-worker (pyproject.toml)
ordered-set==4.1.0
# via pyecore
Expand Down
95 changes: 74 additions & 21 deletions src/grow_worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import base64
import logging
import multiprocessing
import multiprocessing.pool
import signal
import time
from multiprocessing.process import current_process
import os
from pathlib import Path
Expand All @@ -23,6 +26,16 @@
GROW_TASK_TYPE = GrowTaskType(os.environ.get("GROW_TASK_TYPE"))


class EarlySystemExit(Exception):
"""Wrapper for `SystemExit` exception.
To ensure that the worker process does not shutdown but rather handles the `SystemExit` as an
error
"""

...


def run_mesido(input_esdl: str) -> str:
"""Run mesido using the specific workflow.
Expand All @@ -40,30 +53,65 @@ def run_mesido(input_esdl: str) -> str:
influxdb_host = os.environ.get("INFLUXDB_HOSTNAME", "localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT", "8086"))

logger.info("Will write result profiles to influx: {}. At {}:{}",
write_result_db_profiles,
influxdb_host,
influxdb_port)

solution: GROWProblem = mesido_func(
mesido_workflow,
base_folder=base_folder,
esdl_string=base64.encodebytes(input_esdl.encode("utf-8")),
esdl_parser=ESDLStringParser,
write_result_db_profiles=write_result_db_profiles,
influxdb_host=influxdb_host,
influxdb_port=influxdb_port,
influxdb_username=os.environ.get("INFLUXDB_USERNAME"),
influxdb_password=os.environ.get("INFLUXDB_PASSWORD"),
influxdb_ssl=False,
influxdb_verify_ssl=False,
update_progress_function=None,
profile_reader=InfluxDBProfileReader,
logger.info(
"Will write result profiles to influx: {}. At {}:{}",
write_result_db_profiles,
influxdb_host,
influxdb_port,
)
try:
solution: GROWProblem = mesido_func(
mesido_workflow,
base_folder=base_folder,
esdl_string=base64.encodebytes(input_esdl.encode("utf-8")),
esdl_parser=ESDLStringParser,
write_result_db_profiles=write_result_db_profiles,
influxdb_host=influxdb_host,
influxdb_port=influxdb_port,
influxdb_username=os.environ.get("INFLUXDB_USERNAME"),
influxdb_password=os.environ.get("INFLUXDB_PASSWORD"),
influxdb_ssl=False,
influxdb_verify_ssl=False,
update_progress_function=None,
profile_reader=InfluxDBProfileReader,
)
except SystemExit as e:
raise EarlySystemExit(e)

return cast(str, solution.optimized_esdl_string)


def kill_pool(pool: multiprocessing.pool.Pool) -> None:
"""Terminate all the process of a multiprocessing.Pool with SIGKILL.
Found here: https://stackoverflow.com/a/47580796
multiprocessing.Pool.terminate does not provide a way to give a different signal than SIGTERM
so this function hooks into the internals to properly handle sending SIGKILL to all processes in
the pool.
:param pool: The multiprocessing to kill all processes in.
"""
#
# stop repopulating new child
pool._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined]
pool._worker_handler._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined]
for p in pool._pool: # type: ignore[attr-defined]
if p.is_alive():
logger.warning("Sending SIGKILL to pool process with pid %s", p.pid)
os.kill(p.pid, signal.SIGKILL)
# .is_alive() will reap dead process
wait_till = time.time() + 5.0
while (
any(p.is_alive() for p in pool._pool) # type: ignore[attr-defined]
and time.time() < wait_till
):
pass
logger.warning("All processes in pool have been terminated.")
pool.terminate()
logger.warning("Forceful pool termination completed.")


def grow_worker_task(
input_esdl: str, params_dict: ParamsDict, update_progress_handler: UpdateProgressHandler
) -> str:
Expand All @@ -85,10 +133,15 @@ def grow_worker_task(
# subprocesses from being created. This does introduce the issue that if this
# process is killed/cancelled/revoked, the subprocess will continue as a zombie process.
# See https://github.com/Project-OMOTES/optimizer-worker/issues/54
current_process()._config['daemon'] = False # type: ignore[attr-defined]
current_process()._config["daemon"] = False # type: ignore[attr-defined]

with multiprocessing.Pool(1) as pool:
output_esdl = pool.map(run_mesido, [input_esdl])[0]
try:
output_esdl = pool.map(run_mesido, [input_esdl])[0]
except SystemExit as e:
logger.warning("During pool the worker was requested to quit: %s %s", type(e), e)
kill_pool(pool)
raise

return output_esdl

Expand Down

0 comments on commit 905d491

Please sign in to comment.