Skip to content

Commit

Permalink
Add blueetl_core.parallel.isolated() (#10)
Browse files Browse the repository at this point in the history
Other: If the env variable ``BLUEETL_SUBPROCESS_LOGGING_LEVEL`` is empty or not defined, then the log level of the subprocesses is inherited from the parent.
  • Loading branch information
GianlucaFicarelli authored Apr 5, 2024
1 parent 9a4d5ac commit 9b7c7ba
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 5 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
Changelog
=========

Version 0.2.1
-------------

New Features
~~~~~~~~~~~~

- Add ``blueetl_core.parallel.isolated()`` to execute a function in a separate subprocess.

Improvements
~~~~~~~~~~~~

- If the env variable ``BLUEETL_SUBPROCESS_LOGGING_LEVEL`` is empty or not defined, then the log level of the subprocesses is inherited from the parent.

Version 0.2.0
-------------

Expand Down
3 changes: 2 additions & 1 deletion src/blueetl_core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
BLUEETL_JOBLIB_JOBS = "BLUEETL_JOBLIB_JOBS"
# JobLib backend (loky, multiprocessing, threading). If not overridden, it uses by default: loky
BLUEETL_JOBLIB_BACKEND = "BLUEETL_JOBLIB_BACKEND"
# Subprocess logging level. If empty or not defined, fallback to the default level WARNING.
# Logging level to be configured in subprocesses.
# If empty or not defined, use the effective log level of the parent process (recommended).
BLUEETL_SUBPROCESS_LOGGING_LEVEL = "BLUEETL_SUBPROCESS_LOGGING_LEVEL"
39 changes: 35 additions & 4 deletions src/blueetl_core/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
from collections.abc import Iterable
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Optional

import numpy as np
Expand Down Expand Up @@ -42,10 +43,9 @@ def __init__(self, func: Callable) -> None:

def _setup_logging(self, ctx: TaskContext) -> None:
"""Initialize logging in a subprocess."""
loglevel = os.getenv(BLUEETL_SUBPROCESS_LOGGING_LEVEL)
if loglevel:
logformat = f"%(asctime)s %(levelname)s %(name)s [task={ctx.task_id}]: %(message)s"
setup_logging(loglevel=loglevel, logformat=logformat, force=True)
loglevel = os.getenv(BLUEETL_SUBPROCESS_LOGGING_LEVEL) or ctx.loglevel
logformat = f"%(asctime)s %(levelname)s %(name)s [task={ctx.task_id}]: %(message)s"
setup_logging(loglevel=loglevel, logformat=logformat, force=True)

@staticmethod
def _setup_seed(ctx: TaskContext) -> None:
Expand Down Expand Up @@ -120,3 +120,34 @@ def run_parallel(
if shutdown_executor and (not backend or backend == "loky"):
# shutdown the pool of processes used by loky
get_reusable_executor().shutdown(wait=True)


def isolated(func):
"""Isolate a function to be executed in a separate process.
- It uses loky instead of multiprocessing to be able to use joblib inside the subprocess.
- It can work as a decorator, if desired.
Args:
func (function): function to isolate.
Returns:
the isolated function.
"""

def func_isolated(*args, **kwargs):
task = Task(partial(func, *args, **kwargs))
ctx = TaskContext(
task_id=0,
loglevel=L.getEffectiveLevel(),
seed=None,
ppid=os.getpid(),
)
executor = get_reusable_executor(max_workers=1, reuse=False)
try:
future = executor.submit(task, ctx)
return future.result()
finally:
executor.shutdown(wait=True)

return func_isolated
22 changes: 22 additions & 0 deletions tests/test_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from functools import partial

import pytest

from blueetl_core import parallel as test_module


def _myfunc(n):
return n * n


@pytest.mark.parametrize("jobs", [1, 3, None])
def test_run_parallel(jobs):
tasks = [test_module.Task(partial(_myfunc, n=i)) for i in range(3)]
result = test_module.run_parallel(tasks=tasks, jobs=jobs)
assert result == [0, 1, 4]


def test_isolated():
func = test_module.isolated(_myfunc)
result = func(n=2)
assert result == 4

0 comments on commit 9b7c7ba

Please sign in to comment.