Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure Parallel Runner #22

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 101 additions & 23 deletions src/irace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from rpy2 import rinterface as ri
from rpy2.rinterface_lib import na_values
from rpy2.rinterface_lib.sexp import NACharacterType
from multiprocessing import Queue, Process
import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this unused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should belong to #30

from rpy2.rinterface_lib.sexp import NACharacterType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove it.


irace_converter = ro.default_converter + numpy2ri.converter + pandas2ri.converter

Expand Down Expand Up @@ -71,30 +74,66 @@ def r_to_python(data):
raise KeyError(f'Could not proceed, type {type(data)} of rclass ({data.rclass[0]}) is not defined!')
return data # We reached the end of recursion

def make_target_runner(py_target_runner):
def run_with_catch(f, args, kwargs):
try:
res = f(*args, **kwargs)
except:
res = dict(error=traceback.format_exc())
return res

def make_target_runner_parallel(aq: Queue, rq: Queue, check_output_target_runner, scenario_a, target_runner, has_worker):
@ri.rternalize
def tmp_r_target_runner(experiment, scenario):
py_experiment = r_to_python(experiment)
py_scenario = r_to_python(scenario)
# FIXME: How to skip this conversion?
py_experiment['configuration'] = py_experiment['configuration'].to_dict('records')[0]
# FIXME: We should also filter 'switches'
# Filter all the NaN from keys in the dictionary
py_experiment['configuration'] = OrderedDict(
(k,v) for k,v in py_experiment['configuration'].items() if not pd.isna(v)
)
def parallel_runner(*args, **kwargs):
try:
with localconverter(irace_converter_hack):
ret = py_target_runner(py_experiment, py_scenario)
experiments = list(r_to_python(args[0]).values())
n = len(experiments)

ans = [None for i in range(n)]
for i, experiment in enumerate(experiments):
# FIXME: How to skip this conversion?
experiment['configuration'] = experiment['configuration'].to_dict('records')[0]
# FIXME: We should also filter 'switches'
# Filter all the NaN from keys in the dictionary
experiment['configuration'] = OrderedDict(
(k,v) for k,v in experiment['configuration'].items() if not pd.isna(v)
)
if has_worker:
aq.put((i, experiment, scenario_a[0]))
else:
res = run_with_catch(target_runner, (experiment, scenario_a[0]), {})
res = check_output_target_runner(ListVector(res), scenario_a[1])
ans[i] = res

if has_worker:
for _ in range(n):
i, res = rq.get()
with localconverter(irace_converter_hack):
res = check_output_target_runner(ListVector(res), scenario_a[1])
ans[i] = res

return ListVector(zip(range(len(ans)), ans))
except:
# rpy2 swallows traceback from any r.rternalize function so we print it manually.
traceback.print_exc()
ret = dict(error=traceback.format_exc())
return ListVector(ret)
return tmp_r_target_runner
raise
return parallel_runner

def check_windows(scenario):
if scenario.get('parallel', 1) != 1 and os.name == 'nt':
raise NotImplementedError('Parallel running on windows is not supported yet. Follow https://github.com/auto-optimization/iracepy/issues/16 for updates. Alternatively, use Linux or MacOS or the irace R package directly.')
def runner_worker(target_runner, aq: Queue, rq: Queue):
while True:
i, experiment, scenario = aq.get()
if i == -1:
break
rq.put((i, run_with_catch(target_runner, (experiment, scenario), {})))

def check_unsupported_scenarios(scenario):
if scenario.get('targetRunnerRetries', 1) > 1:
raise NotImplementedError("targetRunnerRetries is not yet supported by the python binding although it's supported in the irace R package. We recommend you to implement retries in your target runner.")
if 'targetRunnerParallel' in scenario:
raise NotImplementedError("targetRunnerParallel is not yet supported. If you need this feature, consider opening an issue to show us some people actually want to use this.")

def run_irace(irace, args, q: Queue):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this function doing? It does not seem to be used anywhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry forgot to delete this.

r = irace(*args)
q.put(r)

class irace:
# Imported R package
Expand All @@ -111,8 +150,21 @@ def __init__(self, scenario, parameters_table, target_runner):
self.parameters = self._pkg.readParameters(text = parameters_table, digits = scenario.get('digits', 4))
# IMPORTANT: We need to save this in a variable or it will be garbage
# collected by Python and crash later.
self.r_target_runner = make_target_runner(target_runner)
check_windows(scenario)
self.target_runner = target_runner
self.worker_count = max(self.scenario.get('parallel', 1), 1)
if self.worker_count != 1:
self.target_aq = Queue()
self.target_rq = Queue()
else:
self.target_aq = None
self.target_rq = None
self.workers: list[Process] = []
if self.worker_count != 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move this code within the above if self.worker_count != 1?

for i in range(self.worker_count):
self.workers.append(Process(target=runner_worker, args=(self.target_runner, self.target_aq, self.target_rq)))
for worker in self.workers:
worker.start()


def read_configurations(self, filename=None, text=None):
if text is None:
Expand Down Expand Up @@ -146,11 +198,37 @@ def set_initial(self, x):

def run(self):
"""Returns a Pandas DataFrame, one column per parameter and the row index are the configuration ID."""
self.scenario['targetRunner'] = self.r_target_runner
scenario_a = [None, None]
self.r_target_runner_parallel = make_target_runner_parallel(self.target_aq, self.target_rq, self._pkg.check_output_target_runner, scenario_a, self.target_runner, self.worker_count != 1)
self.scenario['targetRunnerParallel'] = self.r_target_runner_parallel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to always use self.scenario['targetRunnerParallel'] but it should only be used if the user actually wants to run in parallel.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I thought to just spawn a worker regardless but that might create a problem on platforms where a user's process is limited. I'll change it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the change, I just made the target runner parallel to call the function directly if parallel is 0 or 1. Hope this is ok.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If everything is otherwise the same, I would prefer to use targetRunnerParallel instead of using targetRunner even when user does not use parallel because then I could use pure python function and pure python objects through the queues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that your targetRunnerParallel is much more complex than the code it replaces. Is the main benefit to avoid conversions between R/python and increase speed? Also forcing users to use targetRunnerParallel means that they cannot use any of the other parallelization mechanisms in irace, such as MPI/qsub and their own targetRunnerParallel, they cannot use the automatic retries and maybe other things that I have not even thought about.

Thus, I would rather prefer to add an option to irace.init(), for example, "python_parallel=True" that selects to either use your python-based targetRunnerParallel or simply uses the R implementation without changing targetRunnerParallel as it is now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I think it's in part due to me not understanding how to use MPI and not seeing the benefit of it. I started it because I thought it would make it faster, and there was also a problem with garbage collection with the old code which I can't reproduce nor know what's causing it, but I thought spawning new python threads from an embedded R might trigger some edge case in the gc.

It is possible to avoid conversion between R/python without this targetRunnerParallel. But does MPI create new threads for each target runner run, or does it reuse the same thread like a worker pool? If it creates a new thread every time, it might have more overhead than the worker queue that I wrote here.

The only benefit, it remains, is that it fixes the crash on Windows (#16). I think the proper way to fix it is to fix it in the irace R code / MPI / rpy2. But it seems that the error is due to some low level dynamically linked library / shared symbol. I don't think I have the expertise to understand and fix it.

Thus, I would rather prefer to add an option to irace.init(), for example, "python_parallel=True" that selects to either use your python-based targetRunnerParallel or simply uses the R implementation without changing targetRunnerParallel as it is now.

I would actually not prefer this. This creates some confusion for the user. It also create more work for us to maintain two separate logic for stuff like #30. So I would propose that we either close this PR and declare it a failed experiment, or we try to replicate the irace functions like using MPI/qsub, allowing users to set targetRunnerParallel and implement retires.


with localconverter(irace_converter_hack):
res = self._pkg.irace(ListVector(self.scenario), self.parameters)
self.r_scenario = self._pkg.checkScenario(ListVector(self.scenario))
self.scenario = r_to_python(self.r_scenario)
self.scenario.pop('targetRunnerParallel', None)
scenario_a[0] = self.scenario
scenario_a[1] = self.r_scenario
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may need a comment to explain what is going on: You pass scenario_a to make_target_runner_parallel using None values, but then assign values here? How does that work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make_target_runner_parallel needs to know the scenario but to make the scenario (pass the checkScenario), the targetRunnerParallel must be set, creating a chicken and egg problem. scenario_a kind of acts like a pointer. I can give it a null pointer first and then set the value later.

try:
with localconverter(irace_converter_hack):
res = self._pkg.irace(self.r_scenario, self.parameters)
print(res)
except:
self.cleanup(True)
raise
self.cleanup(False)
with localconverter(irace_converter):
res = ro.conversion.rpy2py(res)
# Remove metadata columns.
res = res.loc[:, ~res.columns.str.startswith('.')]
return res

def cleanup(self, forced):
if self.worker_count == 1:
return
if forced:
for worker in self.workers:
worker.terminate()
for i in range(self.worker_count):
self.target_aq.put((-1, None, None))
self.target_aq.close()
self.target_rq.close()
6 changes: 0 additions & 6 deletions tests/test_daemon_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
from irace import irace
import pandas as pd
from multiprocessing import Process
import os

import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered that for daemonic processes, no child process can be started, meaning that the target runner cannot create new threads. The python multiprocessing.Pool uses daemonic processes, so this is here to prevent regression.

def target_runner(experiment, scenario):
Process(target=print, args=(1,)).start()
return dict(cost=experiment['configuration']['one'])

def is_windows():
return os.name == 'nt'

params = '''
one "" r (0, 1)
'''
Expand All @@ -32,8 +28,6 @@ def is_windows():


def test():
if is_windows():
return
tuner = irace(scenario, params, target_runner)
tuner.set_initial(defaults)
best_conf = tuner.run()
Expand Down
24 changes: 1 addition & 23 deletions tests/test_dual_annealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,20 @@ def target_runner(experiment, scenario, lb = LB, ub = UB):

# See https://mlopez-ibanez.github.io/irace/reference/defaultScenario.html

if os.name == 'nt':
parallel = 1
else:
parallel = 2

scenario = dict(
instances = instances,
maxExperiments = 180,
debugLevel = 3,
seed = 123,
digits = 5,
parallel= parallel, # It can run in parallel !
parallel= 2, # It can run in parallel !
logFile = "")

def run_irace(scenario, parameters_table, target_runner):
tuner = irace(scenario, parameters_table, target_runner)
tuner.set_initial_from_str(default_values)
best_confs = tuner.run()
# FIXME: assert type Pandas DataFrame
print(best_confs)

def test_fail_windows():
# FIXME: remove when https://github.com/auto-optimization/iracepy/issues/16 is closed.
if os.name == 'nt':
with pytest.raises(NotImplementedError):
scenario = dict(
instances = instances,
maxExperiments = 180,
debugLevel = 3,
seed = 123,
digits = 5,
parallel= 2, # It can run in parallel !
logFile = "")
tuner = irace(scenario, parameters_table, target_runner)
tuner.run()


def test_run():
run_irace(scenario, parameters_table, target_runner)
Expand Down
9 changes: 0 additions & 9 deletions tests/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
from multiprocessing import Process, Queue
from threading import Timer, Thread, Event
from time import sleep
import os

BASE_TIME = 100

def target_runner(experiment, scenario):
raise ValueError()

def is_windows():
return os.name == 'nt'

params = '''
one "" r (0, 1)
'''
Expand Down Expand Up @@ -58,16 +54,13 @@ def test_no_hang1():
assert not killed

def test_no_hang2():
if is_windows():
return
q = Queue()
p = Process(target=start_irace, args=(q, scenario2))
p.start()
t1 = Timer(BASE_TIME, sigterm_process, args=(p,))
t2 = Timer(BASE_TIME + 1, sigkill_process, args=(p,))
t1.start()
t2.start()
print("jfjfjfj")
for i in range(BASE_TIME + 2):
sleep(1)
if not p.is_alive():
Expand Down Expand Up @@ -113,8 +106,6 @@ def test_correct_exit1():
assert not q.empty()

def test_correct_exit2():
if is_windows():
return
q = Queue()
p = Process(target=start_irace, args=(q, scenario2))
p.start()
Expand Down