diff --git a/src/irace/__init__.py b/src/irace/__init__.py index 0aa8897..fd92483 100644 --- a/src/irace/__init__.py +++ b/src/irace/__init__.py @@ -14,6 +14,7 @@ from rpy2.robjects.conversion import localconverter from rpy2 import rinterface as ri from rpy2.rinterface_lib import na_values +from multiprocessing import Queue, Process @ro.default_converter.rpy2py.register(ri.IntSexpVector) def to_int(obj): @@ -80,29 +81,57 @@ def r_to_python(data): raise KeyError(f'Could not proceed, type {type(data)} of rclass ({data.rclass[0]}) is not defined!') return data # We reached the end of recursion -def make_target_runner(py_target_runner): - @ri.rternalize - def tmp_r_target_runner(experiment, scenario): - py_experiment = r_to_python(experiment) - py_scenario = r_to_python(scenario) - # FIXME: How to skip this conversion? - py_experiment['configuration'] = py_experiment['configuration'].to_dict('records')[0] - # FIXME: We should also filter 'switches' - # Filter all the NaN from keys in the dictionary - py_experiment['configuration'] = OrderedDict( - (k,v) for k,v in py_experiment['configuration'].items() if not pd.isna(v) - ) +def make_target_runner(py_target_runner, check_output_target_runner, scenario): + def tmp_target_runner(experiment, _): try: - ret = py_target_runner(py_experiment, py_scenario) + ret = py_target_runner(experiment, scenario) except: traceback.print_exc() ret = dict(error=traceback.format_exc()) + + ret = check_output_target_runner(ListVector(ret), ListVector(scenario)) return ListVector(ret) - return tmp_r_target_runner + return tmp_target_runner -def check_windows(scenario): - if scenario.get('parallel', 1) != 1 and os.name == 'nt': - raise NotImplementedError('Parallel running on windows is not supported yet. Follow https://github.com/auto-optimization/iracepy/issues/16 for updates. Alternatively, use Linux or MacOS or the irace R package directly.') +def make_target_runner_parallel(aq: Queue, rq: Queue, check_output_target_runner, scenario_a): + @ri.rternalize + def parallel_runner(*args, **kwargs): + try: + experiments = list(r_to_python(args[0]).values()) + n = len(experiments) + + for i, experiment in enumerate(experiments): + # FIXME: How to skip this conversion? + experiment['configuration'] = experiment['configuration'].to_dict('records')[0] + # FIXME: We should also filter 'switches' + # Filter all the NaN from keys in the dictionary + experiment['configuration'] = OrderedDict( + (k,v) for k,v in experiment['configuration'].items() if not pd.isna(v) + ) + aq.put((i, experiment, scenario_a[0])) + ans = [None for i in range(n)] + for _ in range(n): + i, res = rq.get() + res = check_output_target_runner(ListVector(res), scenario_a[1]) + ans[i] = res + return ListVector(zip(range(len(ans)), ans)) + except: + traceback.print_exc() + return parallel_runner + +def runner_worker(target_runner, aq: Queue, rq: Queue): + while True: + i, experiment, scenario = aq.get() + if i == -1: + break + res = target_runner(experiment, scenario) + rq.put((i, res)) + +def check_unsupported_scenarios(scenario): + if scenario.get('targetRunnerRetries', 1) > 1: + raise NotImplementedError("targetRunnerRetries is not yet supported by the python binding although it's supported in the irace R package. We recommend you to implement retries in your target runner.") + if 'targetRunnerParallel' in scenario: + raise NotImplementedError("targetRunnerParallel is not yet supported. If you need this feature, consider opening an issue to show us some people actually want to use this.") class irace: # Imported R package @@ -118,8 +147,16 @@ def __init__(self, scenario, parameters_table, target_runner): self.parameters = self._pkg.readParameters(text = parameters_table, digits = scenario.get('digits', 4)) # IMPORTANT: We need to save this in a variable or it will be garbage # collected by Python and crash later. - self.r_target_runner = make_target_runner(target_runner) - check_windows(scenario) + self.target_runner = target_runner + self.target_aq = Queue() + self.target_rq = Queue() + self.worker_count = max(self.scenario.get('parallel', 1), 1) + self.workers: list[Process] = [] + for i in range(self.worker_count): + self.workers.append(Process(target=runner_worker, args=(self.target_runner, self.target_aq, self.target_rq), daemon=True)) + for worker in self.workers: + worker.start() + def read_configurations(self, filename=None, text=None): if text is None: @@ -151,10 +188,21 @@ def set_initial(self, x): def run(self): """Returns a Pandas DataFrame, one column per parameter and the row index are the configuration ID.""" - self.scenario['targetRunner'] = self.r_target_runner - res = self._pkg.irace(ListVector(self.scenario), self.parameters) + scenario_a = [None, None] + self.r_target_runner_parallel = make_target_runner_parallel(self.target_aq, self.target_rq, self._pkg.check_output_target_runner, scenario_a) + self.scenario['targetRunnerParallel'] = self.r_target_runner_parallel + self.r_scenario = self._pkg.checkScenario(ListVector(self.scenario)) + self.scenario = r_to_python(self.r_scenario) + self.scenario.pop('targetRunnerParallel', None) + scenario_a[0] = self.scenario + scenario_a[1] = self.r_scenario + res = self._pkg.irace(self.r_scenario, self.parameters) with localconverter(irace_converter): res = ro.conversion.rpy2py(res) # Remove metadata columns. res = res.loc[:, ~res.columns.str.startswith('.')] + for i in range(self.worker_count): + self.target_aq.put((-1, None, None)) + self.target_aq.close() + self.target_rq.close() return res diff --git a/tests/test_dual_annealing.py b/tests/test_dual_annealing.py index 589f22e..b5aeb29 100644 --- a/tests/test_dual_annealing.py +++ b/tests/test_dual_annealing.py @@ -36,18 +36,13 @@ def target_runner(experiment, scenario, lb = LB, ub = UB): # See https://mlopez-ibanez.github.io/irace/reference/defaultScenario.html -if os.name == 'nt': - parallel = 1 -else: - parallel = 2 - scenario = dict( instances = instances, maxExperiments = 180, debugLevel = 3, seed = 123, digits = 5, - parallel= parallel, # It can run in parallel ! + parallel= 2, # It can run in parallel ! logFile = "") def test_run(): @@ -56,19 +51,3 @@ def test_run(): best_confs = tuner.run() # FIXME: assert type Pandas DataFrame print(best_confs) - -def test_fail_windows(): - # FIXME: remove when https://github.com/auto-optimization/iracepy/issues/16 is closed. - if os.name == 'nt': - with pytest.raises(NotImplementedError): - scenario = dict( - instances = instances, - maxExperiments = 180, - debugLevel = 3, - seed = 123, - digits = 5, - parallel= 2, # It can run in parallel ! - logFile = "") - tuner = irace(scenario, parameters_table, target_runner) - tuner.run() - \ No newline at end of file