Skip to content

Commit

Permalink
restructure parallel runner
Browse files Browse the repository at this point in the history
  • Loading branch information
DE0CH committed Dec 21, 2022
1 parent 1762952 commit b3604a2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 44 deletions.
94 changes: 72 additions & 22 deletions src/irace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rpy2.robjects.conversion import localconverter
from rpy2 import rinterface as ri
from rpy2.rinterface_lib import na_values
from multiprocessing import Queue, Process

@ro.default_converter.rpy2py.register(ri.IntSexpVector)
def to_int(obj):
Expand Down Expand Up @@ -77,29 +78,55 @@ def r_to_python(data):
raise KeyError(f'Could not proceed, type {type(data)} of rclass ({data.rclass[0]}) is not defined!')
return data # We reached the end of recursion

def make_target_runner(py_target_runner):
def make_target_runner_parallel(aq: Queue, rq: Queue, check_output_target_runner, scenario_a):
@ri.rternalize
def tmp_r_target_runner(experiment, scenario):
py_experiment = r_to_python(experiment)
py_scenario = r_to_python(scenario)
# FIXME: How to skip this conversion?
py_experiment['configuration'] = py_experiment['configuration'].to_dict('records')[0]
# FIXME: We should also filter 'switches'
# Filter all the NaN from keys in the dictionary
py_experiment['configuration'] = OrderedDict(
(k,v) for k,v in py_experiment['configuration'].items() if not pd.isna(v)
)
def parallel_runner(*args, **kwargs):
try:
ret = py_target_runner(py_experiment, py_scenario)
experiments = list(r_to_python(args[0]).values())
n = len(experiments)

for i, experiment in enumerate(experiments):
# FIXME: How to skip this conversion?
experiment['configuration'] = experiment['configuration'].to_dict('records')[0]
# FIXME: We should also filter 'switches'
# Filter all the NaN from keys in the dictionary
experiment['configuration'] = OrderedDict(
(k,v) for k,v in experiment['configuration'].items() if not pd.isna(v)
)
aq.put((i, experiment, scenario_a[0]))
ans = [None for i in range(n)]
for _ in range(n):
i, res = rq.get()
res = check_output_target_runner(ListVector(res), scenario_a[1])
ans[i] = res
return ListVector(zip(range(len(ans)), ans))
except:
# rpy2 swallows traceback from any r.rternalize function so we print it manually.
traceback.print_exc()
ret = dict(error=traceback.format_exc())
return ListVector(ret)
return tmp_r_target_runner
raise
return parallel_runner

def runner_worker(target_runner, aq: Queue, rq: Queue):
while True:
i, experiment, scenario = aq.get()
if i == -1:
break
try:
res = target_runner(experiment, scenario)
except:
res = dict(error=traceback.format_exc())
finally:
rq.put((i, res))

def check_windows(scenario):
if scenario.get('parallel', 1) != 1 and os.name == 'nt':
raise NotImplementedError('Parallel running on windows is not supported yet. Follow https://github.com/auto-optimization/iracepy/issues/16 for updates. Alternatively, use Linux or MacOS or the irace R package directly.')
def check_unsupported_scenarios(scenario):
if scenario.get('targetRunnerRetries', 1) > 1:
raise NotImplementedError("targetRunnerRetries is not yet supported by the python binding although it's supported in the irace R package. We recommend you to implement retries in your target runner.")
if 'targetRunnerParallel' in scenario:
raise NotImplementedError("targetRunnerParallel is not yet supported. If you need this feature, consider opening an issue to show us some people actually want to use this.")

def run_irace(irace, args, q: Queue):
r = irace(*args)
q.put(r)

class irace:
# Imported R package
Expand All @@ -115,8 +142,16 @@ def __init__(self, scenario, parameters_table, target_runner):
self.parameters = self._pkg.readParameters(text = parameters_table, digits = scenario.get('digits', 4))
# IMPORTANT: We need to save this in a variable or it will be garbage
# collected by Python and crash later.
self.r_target_runner = make_target_runner(target_runner)
check_windows(scenario)
self.target_runner = target_runner
self.target_aq = Queue()
self.target_rq = Queue()
self.worker_count = max(self.scenario.get('parallel', 1), 1)
self.workers: list[Process] = []
for i in range(self.worker_count):
self.workers.append(Process(target=runner_worker, args=(self.target_runner, self.target_aq, self.target_rq)))
for worker in self.workers:
worker.start()


def read_configurations(self, filename=None, text=None):
if text is None:
Expand Down Expand Up @@ -148,10 +183,25 @@ def set_initial(self, x):

def run(self):
"""Returns a Pandas DataFrame, one column per parameter and the row index are the configuration ID."""
self.scenario['targetRunner'] = self.r_target_runner
res = self._pkg.irace(ListVector(self.scenario), self.parameters)
scenario_a = [None, None]
self.r_target_runner_parallel = make_target_runner_parallel(self.target_aq, self.target_rq, self._pkg.check_output_target_runner, scenario_a)
self.scenario['targetRunnerParallel'] = self.r_target_runner_parallel
self.r_scenario = self._pkg.checkScenario(ListVector(self.scenario))
self.scenario = r_to_python(self.r_scenario)
self.scenario.pop('targetRunnerParallel', None)
scenario_a[0] = self.scenario
scenario_a[1] = self.r_scenario
try:
res = self._pkg.irace(self.r_scenario, self.parameters)
finally:
self.cleanup()
with localconverter(irace_converter):
res = ro.conversion.rpy2py(res)
# Remove metadata columns.
res = res.loc[:, ~res.columns.str.startswith('.')]
return res
def cleanup(self):
for i in range(self.worker_count):
self.target_aq.put((-1, None, None))
self.target_aq.close()
self.target_rq.close()
23 changes: 1 addition & 22 deletions tests/test_dual_annealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,13 @@ def target_runner(experiment, scenario, lb = LB, ub = UB):

# See https://mlopez-ibanez.github.io/irace/reference/defaultScenario.html

if os.name == 'nt':
parallel = 1
else:
parallel = 2

scenario = dict(
instances = instances,
maxExperiments = 180,
debugLevel = 3,
seed = 123,
digits = 5,
parallel= parallel, # It can run in parallel !
parallel= 2, # It can run in parallel !
logFile = "")

def test_run():
Expand All @@ -56,19 +51,3 @@ def test_run():
best_confs = tuner.run()
# FIXME: assert type Pandas DataFrame
print(best_confs)

def test_fail_windows():
# FIXME: remove when https://github.com/auto-optimization/iracepy/issues/16 is closed.
if os.name == 'nt':
with pytest.raises(NotImplementedError):
scenario = dict(
instances = instances,
maxExperiments = 180,
debugLevel = 3,
seed = 123,
digits = 5,
parallel= 2, # It can run in parallel !
logFile = "")
tuner = irace(scenario, parameters_table, target_runner)
tuner.run()

0 comments on commit b3604a2

Please sign in to comment.