From 15f8cb61d6729a0ce479317956dbad3be6748cec Mon Sep 17 00:00:00 2001 From: Deyao Chen Date: Tue, 13 Dec 2022 13:22:29 +0000 Subject: [PATCH] restructure parallel runner --- src/irace/__init__.py | 97 ++++++++++++++++++++++++++++-------- tests/test_dual_annealing.py | 23 +-------- 2 files changed, 76 insertions(+), 44 deletions(-) diff --git a/src/irace/__init__.py b/src/irace/__init__.py index 0aa8897..56ce496 100644 --- a/src/irace/__init__.py +++ b/src/irace/__init__.py @@ -14,6 +14,7 @@ from rpy2.robjects.conversion import localconverter from rpy2 import rinterface as ri from rpy2.rinterface_lib import na_values +from multiprocessing import Queue, Process @ro.default_converter.rpy2py.register(ri.IntSexpVector) def to_int(obj): @@ -80,29 +81,58 @@ def r_to_python(data): raise KeyError(f'Could not proceed, type {type(data)} of rclass ({data.rclass[0]}) is not defined!') return data # We reached the end of recursion -def make_target_runner(py_target_runner): +def make_target_runner_parallel(aq: Queue, rq: Queue, check_output_target_runner, scenario_a): @ri.rternalize - def tmp_r_target_runner(experiment, scenario): - py_experiment = r_to_python(experiment) - py_scenario = r_to_python(scenario) - # FIXME: How to skip this conversion? - py_experiment['configuration'] = py_experiment['configuration'].to_dict('records')[0] - # FIXME: We should also filter 'switches' - # Filter all the NaN from keys in the dictionary - py_experiment['configuration'] = OrderedDict( - (k,v) for k,v in py_experiment['configuration'].items() if not pd.isna(v) - ) + def parallel_runner(*args, **kwargs): try: - ret = py_target_runner(py_experiment, py_scenario) + experiments = list(r_to_python(args[0]).values()) + n = len(experiments) + + for i, experiment in enumerate(experiments): + # FIXME: How to skip this conversion? + experiment['configuration'] = experiment['configuration'].to_dict('records')[0] + # FIXME: We should also filter 'switches' + # Filter all the NaN from keys in the dictionary + experiment['configuration'] = OrderedDict( + (k,v) for k,v in experiment['configuration'].items() if not pd.isna(v) + ) + aq.put((i, experiment, scenario_a[0])) + ans = [None for i in range(n)] + for _ in range(n): + i, res = rq.get() + try: + res = check_output_target_runner(ListVector(res), scenario_a[1]) + except embedded.RRuntimeError: + ro.r('stop()') + ans[i] = res + return ListVector(zip(range(len(ans)), ans)) except: + # rpy2 swallows traceback from any r.rternalize function so we print it manually. traceback.print_exc() - ret = dict(error=traceback.format_exc()) - return ListVector(ret) - return tmp_r_target_runner + raise + return parallel_runner + +def runner_worker(target_runner, aq: Queue, rq: Queue): + while True: + i, experiment, scenario = aq.get() + if i == -1: + break + try: + res = target_runner(experiment, scenario) + except: + res = dict(error=traceback.format_exc()) + finally: + rq.put((i, res)) -def check_windows(scenario): - if scenario.get('parallel', 1) != 1 and os.name == 'nt': - raise NotImplementedError('Parallel running on windows is not supported yet. Follow https://github.com/auto-optimization/iracepy/issues/16 for updates. Alternatively, use Linux or MacOS or the irace R package directly.') +def check_unsupported_scenarios(scenario): + if scenario.get('targetRunnerRetries', 1) > 1: + raise NotImplementedError("targetRunnerRetries is not yet supported by the python binding although it's supported in the irace R package. We recommend you to implement retries in your target runner.") + if 'targetRunnerParallel' in scenario: + raise NotImplementedError("targetRunnerParallel is not yet supported. If you need this feature, consider opening an issue to show us some people actually want to use this.") + +def run_irace(irace, args, q: Queue): + r = irace(*args) + q.put(r) class irace: # Imported R package @@ -118,8 +148,16 @@ def __init__(self, scenario, parameters_table, target_runner): self.parameters = self._pkg.readParameters(text = parameters_table, digits = scenario.get('digits', 4)) # IMPORTANT: We need to save this in a variable or it will be garbage # collected by Python and crash later. - self.r_target_runner = make_target_runner(target_runner) - check_windows(scenario) + self.target_runner = target_runner + self.target_aq = Queue() + self.target_rq = Queue() + self.worker_count = max(self.scenario.get('parallel', 1), 1) + self.workers: list[Process] = [] + for i in range(self.worker_count): + self.workers.append(Process(target=runner_worker, args=(self.target_runner, self.target_aq, self.target_rq))) + for worker in self.workers: + worker.start() + def read_configurations(self, filename=None, text=None): if text is None: @@ -151,10 +189,25 @@ def set_initial(self, x): def run(self): """Returns a Pandas DataFrame, one column per parameter and the row index are the configuration ID.""" - self.scenario['targetRunner'] = self.r_target_runner - res = self._pkg.irace(ListVector(self.scenario), self.parameters) + scenario_a = [None, None] + self.r_target_runner_parallel = make_target_runner_parallel(self.target_aq, self.target_rq, self._pkg.check_output_target_runner, scenario_a) + self.scenario['targetRunnerParallel'] = self.r_target_runner_parallel + self.r_scenario = self._pkg.checkScenario(ListVector(self.scenario)) + self.scenario = r_to_python(self.r_scenario) + self.scenario.pop('targetRunnerParallel', None) + scenario_a[0] = self.scenario + scenario_a[1] = self.r_scenario + try: + res = self._pkg.irace(self.r_scenario, self.parameters) + finally: + self.cleanup() with localconverter(irace_converter): res = ro.conversion.rpy2py(res) # Remove metadata columns. res = res.loc[:, ~res.columns.str.startswith('.')] return res + def cleanup(self): + for i in range(self.worker_count): + self.target_aq.put((-1, None, None)) + self.target_aq.close() + self.target_rq.close() diff --git a/tests/test_dual_annealing.py b/tests/test_dual_annealing.py index 589f22e..b5aeb29 100644 --- a/tests/test_dual_annealing.py +++ b/tests/test_dual_annealing.py @@ -36,18 +36,13 @@ def target_runner(experiment, scenario, lb = LB, ub = UB): # See https://mlopez-ibanez.github.io/irace/reference/defaultScenario.html -if os.name == 'nt': - parallel = 1 -else: - parallel = 2 - scenario = dict( instances = instances, maxExperiments = 180, debugLevel = 3, seed = 123, digits = 5, - parallel= parallel, # It can run in parallel ! + parallel= 2, # It can run in parallel ! logFile = "") def test_run(): @@ -56,19 +51,3 @@ def test_run(): best_confs = tuner.run() # FIXME: assert type Pandas DataFrame print(best_confs) - -def test_fail_windows(): - # FIXME: remove when https://github.com/auto-optimization/iracepy/issues/16 is closed. - if os.name == 'nt': - with pytest.raises(NotImplementedError): - scenario = dict( - instances = instances, - maxExperiments = 180, - debugLevel = 3, - seed = 123, - digits = 5, - parallel= 2, # It can run in parallel ! - logFile = "") - tuner = irace(scenario, parameters_table, target_runner) - tuner.run() - \ No newline at end of file