Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

54 figure out a better alternative for the celery subprocess in subprocess problem #58

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ coolprop==6.6.0
# -c requirements.txt
# mesido
coverage[toml]==7.4.1
# via pytest-cov
# via
# coverage
# pytest-cov
exceptiongroup==1.2.0
# via pytest
flake8==6.1.0
Expand Down Expand Up @@ -158,7 +160,7 @@ omotes-sdk-protocol==0.0.8
# via
# -c requirements.txt
# omotes-sdk-python
omotes-sdk-python==0.0.15
omotes-sdk-python==0.0.16
# via
# -c requirements.txt
# omotes-grow-worker (pyproject.toml)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ requires-python = ">=3.10"
dependencies = [
"python-dotenv ~= 1.0.0",
"mesido ~= 0.1.3.1",
"omotes-sdk-python ~= 0.0.15"
"omotes-sdk-python ~= 0.0.16"
]

[project.optional-dependencies]
Expand Down Expand Up @@ -66,7 +66,7 @@ requires = [
enabled = true

[tool.pytest.ini_options]
addopts = "--cov=grow_worker --cov-report html --cov-report term-missing --cov-fail-under 25"
addopts = "--cov=grow_worker --cov-report html --cov-report term-missing --cov-fail-under 15"

[tool.coverage.run]
source = ["src"]
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ numpy==1.22.4
# scipy
omotes-sdk-protocol==0.0.8
# via omotes-sdk-python
omotes-sdk-python==0.0.15
omotes-sdk-python==0.0.16
# via omotes-grow-worker (pyproject.toml)
ordered-set==4.1.0
# via pyecore
Expand Down
95 changes: 74 additions & 21 deletions src/grow_worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import base64
import logging
import multiprocessing
import multiprocessing.pool
import signal
import time
from multiprocessing.process import current_process
import os
from pathlib import Path
Expand All @@ -23,6 +26,16 @@
GROW_TASK_TYPE = GrowTaskType(os.environ.get("GROW_TASK_TYPE"))


class EarlySystemExit(Exception):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from omotes-sdk-python to here. As Celery uses the SystemExit mechanism to also stop the running worker gracefully, before it would catch the SystemExit and not let the worker process terminate as expected.

"""Wrapper for `SystemExit` exception.

To ensure that the worker process does not shutdown but rather handles the `SystemExit` as an
error
"""

...


def run_mesido(input_esdl: str) -> str:
"""Run mesido using the specific workflow.

Expand All @@ -40,30 +53,65 @@ def run_mesido(input_esdl: str) -> str:
influxdb_host = os.environ.get("INFLUXDB_HOSTNAME", "localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT", "8086"))

logger.info("Will write result profiles to influx: {}. At {}:{}",
write_result_db_profiles,
influxdb_host,
influxdb_port)

solution: GROWProblem = mesido_func(
mesido_workflow,
base_folder=base_folder,
esdl_string=base64.encodebytes(input_esdl.encode("utf-8")),
esdl_parser=ESDLStringParser,
write_result_db_profiles=write_result_db_profiles,
influxdb_host=influxdb_host,
influxdb_port=influxdb_port,
influxdb_username=os.environ.get("INFLUXDB_USERNAME"),
influxdb_password=os.environ.get("INFLUXDB_PASSWORD"),
influxdb_ssl=False,
influxdb_verify_ssl=False,
update_progress_function=None,
profile_reader=InfluxDBProfileReader,
logger.info(
"Will write result profiles to influx: {}. At {}:{}",
write_result_db_profiles,
influxdb_host,
influxdb_port,
)
try:
solution: GROWProblem = mesido_func(
mesido_workflow,
base_folder=base_folder,
esdl_string=base64.encodebytes(input_esdl.encode("utf-8")),
esdl_parser=ESDLStringParser,
write_result_db_profiles=write_result_db_profiles,
influxdb_host=influxdb_host,
influxdb_port=influxdb_port,
influxdb_username=os.environ.get("INFLUXDB_USERNAME"),
influxdb_password=os.environ.get("INFLUXDB_PASSWORD"),
influxdb_ssl=False,
influxdb_verify_ssl=False,
update_progress_function=None,
profile_reader=InfluxDBProfileReader,
)
except SystemExit as e:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only triggered when mesido throws the SystemExit as it runs in a subprocess

raise EarlySystemExit(e)

return cast(str, solution.optimized_esdl_string)


def kill_pool(pool: multiprocessing.pool.Pool) -> None:
"""Terminate all the process of a multiprocessing.Pool with SIGKILL.

Found here: https://stackoverflow.com/a/47580796

multiprocessing.Pool.terminate does not provide a way to give a different signal than SIGTERM
so this function hooks into the internals to properly handle sending SIGKILL to all processes in
the pool.

:param pool: The multiprocessing to kill all processes in.
"""
#
# stop repopulating new child
pool._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined]
pool._worker_handler._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined]
for p in pool._pool: # type: ignore[attr-defined]
if p.is_alive():
logger.warning("Sending SIGKILL to pool process with pid %s", p.pid)
os.kill(p.pid, signal.SIGKILL)
# .is_alive() will reap dead process
wait_till = time.time() + 5.0
while (
any(p.is_alive() for p in pool._pool) # type: ignore[attr-defined]
and time.time() < wait_till
):
pass
logger.warning("All processes in pool have been terminated.")
pool.terminate()
logger.warning("Forceful pool termination completed.")


def grow_worker_task(
input_esdl: str, params_dict: ParamsDict, update_progress_handler: UpdateProgressHandler
) -> str:
Expand All @@ -85,10 +133,15 @@ def grow_worker_task(
# subprocesses from being created. This does introduce the issue that if this
# process is killed/cancelled/revoked, the subprocess will continue as a zombie process.
# See https://github.com/Project-OMOTES/optimizer-worker/issues/54
current_process()._config['daemon'] = False # type: ignore[attr-defined]
current_process()._config["daemon"] = False # type: ignore[attr-defined]

with multiprocessing.Pool(1) as pool:
output_esdl = pool.map(run_mesido, [input_esdl])[0]
try:
output_esdl = pool.map(run_mesido, [input_esdl])[0]
except SystemExit as e:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is triggered when Celery throws a SystemExit to cancel the current task. It will terminate the mesido worker subprocess with a SIGKILL to ensure it does not wait till it dies gracefully. Casadi is blocking any way to gracefully shutdown the process as it doesn't release the GIL.

logger.warning("During pool the worker was requested to quit: %s %s", type(e), e)
kill_pool(pool)
raise

return output_esdl

Expand Down
Loading