Skip to content

Commit

Permalink
restructure parallel runner
Browse files Browse the repository at this point in the history
  • Loading branch information
DE0CH committed Dec 23, 2022
1 parent b2177a6 commit 5bf567a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 60 deletions.
119 changes: 96 additions & 23 deletions src/irace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rpy2.robjects.conversion import localconverter
from rpy2 import rinterface as ri
from rpy2.rinterface_lib import na_values
from multiprocessing import Queue, Process

@ro.default_converter.rpy2py.register(ri.IntSexpVector)
def to_int(obj):
Expand Down Expand Up @@ -77,29 +78,65 @@ def r_to_python(data):
raise KeyError(f'Could not proceed, type {type(data)} of rclass ({data.rclass[0]}) is not defined!')
return data # We reached the end of recursion

def make_target_runner(py_target_runner):
def run_with_catch(f, args, kwargs):
try:
res = f(*args, **kwargs)
except:
res = dict(error=traceback.format_exc())
return res

def make_target_runner_parallel(aq: Queue, rq: Queue, check_output_target_runner, scenario_a, target_runner, has_worker):
@ri.rternalize
def tmp_r_target_runner(experiment, scenario):
py_experiment = r_to_python(experiment)
py_scenario = r_to_python(scenario)
# FIXME: How to skip this conversion?
py_experiment['configuration'] = py_experiment['configuration'].to_dict('records')[0]
# FIXME: We should also filter 'switches'
# Filter all the NaN from keys in the dictionary
py_experiment['configuration'] = OrderedDict(
(k,v) for k,v in py_experiment['configuration'].items() if not pd.isna(v)
)
def parallel_runner(*args, **kwargs):
try:
ret = py_target_runner(py_experiment, py_scenario)
experiments = list(r_to_python(args[0]).values())
n = len(experiments)

ans = [None for i in range(n)]
for i, experiment in enumerate(experiments):
# FIXME: How to skip this conversion?
experiment['configuration'] = experiment['configuration'].to_dict('records')[0]
# FIXME: We should also filter 'switches'
# Filter all the NaN from keys in the dictionary
experiment['configuration'] = OrderedDict(
(k,v) for k,v in experiment['configuration'].items() if not pd.isna(v)
)
if has_worker:
aq.put((i, experiment, scenario_a[0]))
else:
res = run_with_catch(target_runner, (experiment, scenario_a[0]), {})
res = check_output_target_runner(ListVector(res), scenario_a[1])
ans[i] = res

if has_worker:
for _ in range(n):
i, res = rq.get()
res = check_output_target_runner(ListVector(res), scenario_a[1])
ans[i] = res

return ListVector(zip(range(len(ans)), ans))
except:
# rpy2 swallows traceback from any r.rternalize function so we print it manually.
traceback.print_exc()
ret = dict(error=traceback.format_exc())
return ListVector(ret)
return tmp_r_target_runner

def check_windows(scenario):
if scenario.get('parallel', 1) != 1 and os.name == 'nt':
raise NotImplementedError('Parallel running on windows is not supported yet. Follow https://github.com/auto-optimization/iracepy/issues/16 for updates. Alternatively, use Linux or MacOS or the irace R package directly.')
raise
return parallel_runner

def runner_worker(target_runner, aq: Queue, rq: Queue):
while True:
i, experiment, scenario = aq.get()
if i == -1:
break
rq.put((i, run_with_catch(target_runner, (experiment, scenario), {})))

def check_unsupported_scenarios(scenario):
if scenario.get('targetRunnerRetries', 1) > 1:
raise NotImplementedError("targetRunnerRetries is not yet supported by the python binding although it's supported in the irace R package. We recommend you to implement retries in your target runner.")
if 'targetRunnerParallel' in scenario:
raise NotImplementedError("targetRunnerParallel is not yet supported. If you need this feature, consider opening an issue to show us some people actually want to use this.")

def run_irace(irace, args, q: Queue):
r = irace(*args)
q.put(r)

class irace:
# Imported R package
Expand All @@ -115,8 +152,21 @@ def __init__(self, scenario, parameters_table, target_runner):
self.parameters = self._pkg.readParameters(text = parameters_table, digits = scenario.get('digits', 4))
# IMPORTANT: We need to save this in a variable or it will be garbage
# collected by Python and crash later.
self.r_target_runner = make_target_runner(target_runner)
check_windows(scenario)
self.target_runner = target_runner
self.worker_count = max(self.scenario.get('parallel', 1), 1)
if self.worker_count != 1:
self.target_aq = Queue()
self.target_rq = Queue()
else:
self.target_aq = None
self.target_rq = None
self.workers: list[Process] = []
if self.worker_count != 1:
for i in range(self.worker_count):
self.workers.append(Process(target=runner_worker, args=(self.target_runner, self.target_aq, self.target_rq)))
for worker in self.workers:
worker.start()


def read_configurations(self, filename=None, text=None):
if text is None:
Expand Down Expand Up @@ -148,10 +198,33 @@ def set_initial(self, x):

def run(self):
"""Returns a Pandas DataFrame, one column per parameter and the row index are the configuration ID."""
self.scenario['targetRunner'] = self.r_target_runner
res = self._pkg.irace(ListVector(self.scenario), self.parameters)
scenario_a = [None, None]
self.r_target_runner_parallel = make_target_runner_parallel(self.target_aq, self.target_rq, self._pkg.check_output_target_runner, scenario_a, self.target_runner, self.worker_count != 1)
self.scenario['targetRunnerParallel'] = self.r_target_runner_parallel
self.r_scenario = self._pkg.checkScenario(ListVector(self.scenario))
self.scenario = r_to_python(self.r_scenario)
self.scenario.pop('targetRunnerParallel', None)
scenario_a[0] = self.scenario
scenario_a[1] = self.r_scenario
try:
res = self._pkg.irace(self.r_scenario, self.parameters)
except:
self.cleanup(True)
raise
self.cleanup(False)
with localconverter(irace_converter):
res = ro.conversion.rpy2py(res)
# Remove metadata columns.
res = res.loc[:, ~res.columns.str.startswith('.')]
return res

def cleanup(self, forced):
if self.worker_count == 1:
return
if forced:
for worker in self.workers:
worker.terminate()
for i in range(self.worker_count):
self.target_aq.put((-1, None, None))
self.target_aq.close()
self.target_rq.close()
6 changes: 0 additions & 6 deletions tests/test_daemon_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
from irace import irace
import pandas as pd
from multiprocessing import Process
import os

import json
def target_runner(experiment, scenario):
Process(target=print, args=(1,)).start()
return dict(cost=experiment['configuration']['one'])

def is_windows():
return os.name == 'nt'

params = '''
one "" r (0, 1)
'''
Expand All @@ -32,8 +28,6 @@ def is_windows():


def test():
if is_windows():
return
tuner = irace(scenario, params, target_runner)
tuner.set_initial(defaults)
best_conf = tuner.run()
Expand Down
23 changes: 1 addition & 22 deletions tests/test_dual_annealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,13 @@ def target_runner(experiment, scenario, lb = LB, ub = UB):

# See https://mlopez-ibanez.github.io/irace/reference/defaultScenario.html

if os.name == 'nt':
parallel = 1
else:
parallel = 2

scenario = dict(
instances = instances,
maxExperiments = 180,
debugLevel = 3,
seed = 123,
digits = 5,
parallel= parallel, # It can run in parallel !
parallel= 2, # It can run in parallel !
logFile = "")

def test_run():
Expand All @@ -56,19 +51,3 @@ def test_run():
best_confs = tuner.run()
# FIXME: assert type Pandas DataFrame
print(best_confs)

def test_fail_windows():
# FIXME: remove when https://github.com/auto-optimization/iracepy/issues/16 is closed.
if os.name == 'nt':
with pytest.raises(NotImplementedError):
scenario = dict(
instances = instances,
maxExperiments = 180,
debugLevel = 3,
seed = 123,
digits = 5,
parallel= 2, # It can run in parallel !
logFile = "")
tuner = irace(scenario, parameters_table, target_runner)
tuner.run()

9 changes: 0 additions & 9 deletions tests/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
from multiprocessing import Process, Queue
from threading import Timer, Thread, Event
from time import sleep
import os

BASE_TIME = 100

def target_runner(experiment, scenario):
raise ValueError()

def is_windows():
return os.name == 'nt'

params = '''
one "" r (0, 1)
'''
Expand Down Expand Up @@ -58,16 +54,13 @@ def test_no_hang1():
assert not killed

def test_no_hang2():
if is_windows():
return
q = Queue()
p = Process(target=start_irace, args=(q, scenario2))
p.start()
t1 = Timer(BASE_TIME, sigterm_process, args=(p,))
t2 = Timer(BASE_TIME + 1, sigkill_process, args=(p,))
t1.start()
t2.start()
print("jfjfjfj")
for i in range(BASE_TIME + 2):
sleep(1)
if not p.is_alive():
Expand Down Expand Up @@ -113,8 +106,6 @@ def test_correct_exit1():
assert not q.empty()

def test_correct_exit2():
if is_windows():
return
q = Queue()
p = Process(target=start_irace, args=(q, scenario2))
p.start()
Expand Down

0 comments on commit 5bf567a

Please sign in to comment.