From 9dd1ba11628e59bf9d5dbd18143a2582e866e382 Mon Sep 17 00:00:00 2001 From: Gianluca Ficarelli Date: Fri, 5 Apr 2024 14:34:18 +0200 Subject: [PATCH] Add blueetl_core.parallel.isolated() Other: If the env variable ``BLUEETL_SUBPROCESS_LOGGING_LEVEL`` is empty or not defined, then the log level of the subprocesses is inherited from the parent. --- CHANGELOG.rst | 13 ++++++++++++ src/blueetl_core/constants.py | 3 ++- src/blueetl_core/parallel.py | 40 +++++++++++++++++++++++++++++++---- tests/test_parallel.py | 22 +++++++++++++++++++ 4 files changed, 73 insertions(+), 5 deletions(-) create mode 100644 tests/test_parallel.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 75193b2..16462b0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,19 @@ Changelog ========= +Version 0.2.1 +------------- + +New Features +~~~~~~~~~~~~ + +- Add ``blueetl_core.parallel.isolated()`` to execute a function in a separate subprocess. + +Improvements +~~~~~~~~~~~~ + +- If the env variable ``BLUEETL_SUBPROCESS_LOGGING_LEVEL`` is empty or not defined, then the log level of the subprocesses is inherited from the parent. + Version 0.2.0 ------------- diff --git a/src/blueetl_core/constants.py b/src/blueetl_core/constants.py index aa74618..72a6274 100644 --- a/src/blueetl_core/constants.py +++ b/src/blueetl_core/constants.py @@ -9,5 +9,6 @@ BLUEETL_JOBLIB_JOBS = "BLUEETL_JOBLIB_JOBS" # JobLib backend (loky, multiprocessing, threading). If not overridden, it uses by default: loky BLUEETL_JOBLIB_BACKEND = "BLUEETL_JOBLIB_BACKEND" -# Subprocess logging level. If empty or not defined, fallback to the default level WARNING. +# Logging level to be configured in subprocesses. +# If empty or not defined, use the effective log level of the parent process (recommended). BLUEETL_SUBPROCESS_LOGGING_LEVEL = "BLUEETL_SUBPROCESS_LOGGING_LEVEL" diff --git a/src/blueetl_core/parallel.py b/src/blueetl_core/parallel.py index f14eb1e..69985d5 100644 --- a/src/blueetl_core/parallel.py +++ b/src/blueetl_core/parallel.py @@ -4,6 +4,7 @@ import os from collections.abc import Iterable from dataclasses import dataclass +from functools import partial from typing import Any, Callable, Optional import numpy as np @@ -42,10 +43,9 @@ def __init__(self, func: Callable) -> None: def _setup_logging(self, ctx: TaskContext) -> None: """Initialize logging in a subprocess.""" - loglevel = os.getenv(BLUEETL_SUBPROCESS_LOGGING_LEVEL) - if loglevel: - logformat = f"%(asctime)s %(levelname)s %(name)s [task={ctx.task_id}]: %(message)s" - setup_logging(loglevel=loglevel, logformat=logformat, force=True) + loglevel = os.getenv(BLUEETL_SUBPROCESS_LOGGING_LEVEL) or ctx.loglevel + logformat = f"%(asctime)s %(levelname)s %(name)s [task={ctx.task_id}]: %(message)s" + setup_logging(loglevel=loglevel, logformat=logformat, force=True) @staticmethod def _setup_seed(ctx: TaskContext) -> None: @@ -120,3 +120,35 @@ def run_parallel( if shutdown_executor and (not backend or backend == "loky"): # shutdown the pool of processes used by loky get_reusable_executor().shutdown(wait=True) + + +def isolated(func): + """Isolate a function to be executed in a separate process. + + Notes: + - it uses loky instead of multiprocessing to be able to use joblib inside the subprocess. + - it can work as a decorator, if desired. + + Args: + func (function): function to isolate. + + Returns: + the isolated function. + """ + + def func_isolated(*args, **kwargs): + task = Task(partial(func, *args, **kwargs)) + ctx = TaskContext( + task_id=0, + loglevel=L.getEffectiveLevel(), + seed=None, + ppid=os.getpid(), + ) + executor = get_reusable_executor(max_workers=1, reuse=False) + try: + future = executor.submit(task, ctx) + return future.result() + finally: + executor.shutdown(wait=True) + + return func_isolated diff --git a/tests/test_parallel.py b/tests/test_parallel.py new file mode 100644 index 0000000..8262a0e --- /dev/null +++ b/tests/test_parallel.py @@ -0,0 +1,22 @@ +from functools import partial + +import pytest + +from blueetl_core import parallel as test_module + + +def _myfunc(n): + return n * n + + +@pytest.mark.parametrize("jobs", [1, 3, None]) +def test_run_parallel(jobs): + tasks = [test_module.Task(partial(_myfunc, n=i)) for i in range(3)] + result = test_module.run_parallel(tasks=tasks, jobs=jobs) + assert result == [0, 1, 4] + + +def test_isolated(): + func = test_module.isolated(_myfunc) + result = func(n=2) + assert result == 4