From 8b94a72d017b48f9edde2ed03f381912261caadd Mon Sep 17 00:00:00 2001 From: Deyao Chen Date: Tue, 13 Dec 2022 13:22:29 +0000 Subject: [PATCH] restructure parallel runner --- src/irace/__init__.py | 124 ++++++++++++++++++++++++++++------- tests/test_daemon_process.py | 6 -- tests/test_dual_annealing.py | 24 +------ tests/test_errors.py | 9 --- 4 files changed, 102 insertions(+), 61 deletions(-) diff --git a/src/irace/__init__.py b/src/irace/__init__.py index 643131c..5060bfa 100644 --- a/src/irace/__init__.py +++ b/src/irace/__init__.py @@ -11,6 +11,9 @@ from rpy2 import rinterface as ri from rpy2.rinterface_lib import na_values from rpy2.rinterface_lib.sexp import NACharacterType +from multiprocessing import Queue, Process +import json +from rpy2.rinterface_lib.sexp import NACharacterType irace_converter = ro.default_converter + numpy2ri.converter + pandas2ri.converter @@ -71,30 +74,66 @@ def r_to_python(data): raise KeyError(f'Could not proceed, type {type(data)} of rclass ({data.rclass[0]}) is not defined!') return data # We reached the end of recursion -def make_target_runner(py_target_runner): +def run_with_catch(f, args, kwargs): + try: + res = f(*args, **kwargs) + except: + res = dict(error=traceback.format_exc()) + return res + +def make_target_runner_parallel(aq: Queue, rq: Queue, check_output_target_runner, scenario_a, target_runner, has_worker): @ri.rternalize - def tmp_r_target_runner(experiment, scenario): - py_experiment = r_to_python(experiment) - py_scenario = r_to_python(scenario) - # FIXME: How to skip this conversion? - py_experiment['configuration'] = py_experiment['configuration'].to_dict('records')[0] - # FIXME: We should also filter 'switches' - # Filter all the NaN from keys in the dictionary - py_experiment['configuration'] = OrderedDict( - (k,v) for k,v in py_experiment['configuration'].items() if not pd.isna(v) - ) + def parallel_runner(*args, **kwargs): try: - with localconverter(irace_converter_hack): - ret = py_target_runner(py_experiment, py_scenario) + experiments = list(r_to_python(args[0]).values()) + n = len(experiments) + + ans = [None for i in range(n)] + for i, experiment in enumerate(experiments): + # FIXME: How to skip this conversion? + experiment['configuration'] = experiment['configuration'].to_dict('records')[0] + # FIXME: We should also filter 'switches' + # Filter all the NaN from keys in the dictionary + experiment['configuration'] = OrderedDict( + (k,v) for k,v in experiment['configuration'].items() if not pd.isna(v) + ) + if has_worker: + aq.put((i, experiment, scenario_a[0])) + else: + res = run_with_catch(target_runner, (experiment, scenario_a[0]), {}) + res = check_output_target_runner(ListVector(res), scenario_a[1]) + ans[i] = res + + if has_worker: + for _ in range(n): + i, res = rq.get() + with localconverter(irace_converter_hack): + res = check_output_target_runner(ListVector(res), scenario_a[1]) + ans[i] = res + + return ListVector(zip(range(len(ans)), ans)) except: + # rpy2 swallows traceback from any r.rternalize function so we print it manually. traceback.print_exc() - ret = dict(error=traceback.format_exc()) - return ListVector(ret) - return tmp_r_target_runner + raise + return parallel_runner -def check_windows(scenario): - if scenario.get('parallel', 1) != 1 and os.name == 'nt': - raise NotImplementedError('Parallel running on windows is not supported yet. Follow https://github.com/auto-optimization/iracepy/issues/16 for updates. Alternatively, use Linux or MacOS or the irace R package directly.') +def runner_worker(target_runner, aq: Queue, rq: Queue): + while True: + i, experiment, scenario = aq.get() + if i == -1: + break + rq.put((i, run_with_catch(target_runner, (experiment, scenario), {}))) + +def check_unsupported_scenarios(scenario): + if scenario.get('targetRunnerRetries', 1) > 1: + raise NotImplementedError("targetRunnerRetries is not yet supported by the python binding although it's supported in the irace R package. We recommend you to implement retries in your target runner.") + if 'targetRunnerParallel' in scenario: + raise NotImplementedError("targetRunnerParallel is not yet supported. If you need this feature, consider opening an issue to show us some people actually want to use this.") + +def run_irace(irace, args, q: Queue): + r = irace(*args) + q.put(r) class irace: # Imported R package @@ -111,8 +150,21 @@ def __init__(self, scenario, parameters_table, target_runner): self.parameters = self._pkg.readParameters(text = parameters_table, digits = scenario.get('digits', 4)) # IMPORTANT: We need to save this in a variable or it will be garbage # collected by Python and crash later. - self.r_target_runner = make_target_runner(target_runner) - check_windows(scenario) + self.target_runner = target_runner + self.worker_count = max(self.scenario.get('parallel', 1), 1) + if self.worker_count != 1: + self.target_aq = Queue() + self.target_rq = Queue() + else: + self.target_aq = None + self.target_rq = None + self.workers: list[Process] = [] + if self.worker_count != 1: + for i in range(self.worker_count): + self.workers.append(Process(target=runner_worker, args=(self.target_runner, self.target_aq, self.target_rq))) + for worker in self.workers: + worker.start() + def read_configurations(self, filename=None, text=None): if text is None: @@ -146,11 +198,37 @@ def set_initial(self, x): def run(self): """Returns a Pandas DataFrame, one column per parameter and the row index are the configuration ID.""" - self.scenario['targetRunner'] = self.r_target_runner + scenario_a = [None, None] + self.r_target_runner_parallel = make_target_runner_parallel(self.target_aq, self.target_rq, self._pkg.check_output_target_runner, scenario_a, self.target_runner, self.worker_count != 1) + self.scenario['targetRunnerParallel'] = self.r_target_runner_parallel + with localconverter(irace_converter_hack): - res = self._pkg.irace(ListVector(self.scenario), self.parameters) + self.r_scenario = self._pkg.checkScenario(ListVector(self.scenario)) + self.scenario = r_to_python(self.r_scenario) + self.scenario.pop('targetRunnerParallel', None) + scenario_a[0] = self.scenario + scenario_a[1] = self.r_scenario + try: + with localconverter(irace_converter_hack): + res = self._pkg.irace(self.r_scenario, self.parameters) + print(res) + except: + self.cleanup(True) + raise + self.cleanup(False) with localconverter(irace_converter): res = ro.conversion.rpy2py(res) # Remove metadata columns. res = res.loc[:, ~res.columns.str.startswith('.')] return res + + def cleanup(self, forced): + if self.worker_count == 1: + return + if forced: + for worker in self.workers: + worker.terminate() + for i in range(self.worker_count): + self.target_aq.put((-1, None, None)) + self.target_aq.close() + self.target_rq.close() diff --git a/tests/test_daemon_process.py b/tests/test_daemon_process.py index bf97f40..adb8d0e 100644 --- a/tests/test_daemon_process.py +++ b/tests/test_daemon_process.py @@ -4,16 +4,12 @@ from irace import irace import pandas as pd from multiprocessing import Process -import os import json def target_runner(experiment, scenario): Process(target=print, args=(1,)).start() return dict(cost=experiment['configuration']['one']) -def is_windows(): - return os.name == 'nt' - params = ''' one "" r (0, 1) ''' @@ -32,8 +28,6 @@ def is_windows(): def test(): - if is_windows(): - return tuner = irace(scenario, params, target_runner) tuner.set_initial(defaults) best_conf = tuner.run() diff --git a/tests/test_dual_annealing.py b/tests/test_dual_annealing.py index 67adb86..2443e21 100644 --- a/tests/test_dual_annealing.py +++ b/tests/test_dual_annealing.py @@ -37,18 +37,13 @@ def target_runner(experiment, scenario, lb = LB, ub = UB): # See https://mlopez-ibanez.github.io/irace/reference/defaultScenario.html -if os.name == 'nt': - parallel = 1 -else: - parallel = 2 - scenario = dict( instances = instances, maxExperiments = 180, debugLevel = 3, seed = 123, digits = 5, - parallel= parallel, # It can run in parallel ! + parallel= 2, # It can run in parallel ! logFile = "") def run_irace(scenario, parameters_table, target_runner): @@ -56,23 +51,6 @@ def run_irace(scenario, parameters_table, target_runner): tuner.set_initial_from_str(default_values) best_confs = tuner.run() # FIXME: assert type Pandas DataFrame - print(best_confs) - -def test_fail_windows(): - # FIXME: remove when https://github.com/auto-optimization/iracepy/issues/16 is closed. - if os.name == 'nt': - with pytest.raises(NotImplementedError): - scenario = dict( - instances = instances, - maxExperiments = 180, - debugLevel = 3, - seed = 123, - digits = 5, - parallel= 2, # It can run in parallel ! - logFile = "") - tuner = irace(scenario, parameters_table, target_runner) - tuner.run() - def test_run(): run_irace(scenario, parameters_table, target_runner) diff --git a/tests/test_errors.py b/tests/test_errors.py index 0357f12..4efeb91 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -4,16 +4,12 @@ from multiprocessing import Process, Queue from threading import Timer, Thread, Event from time import sleep -import os BASE_TIME = 100 def target_runner(experiment, scenario): raise ValueError() -def is_windows(): - return os.name == 'nt' - params = ''' one "" r (0, 1) ''' @@ -58,8 +54,6 @@ def test_no_hang1(): assert not killed def test_no_hang2(): - if is_windows(): - return q = Queue() p = Process(target=start_irace, args=(q, scenario2)) p.start() @@ -67,7 +61,6 @@ def test_no_hang2(): t2 = Timer(BASE_TIME + 1, sigkill_process, args=(p,)) t1.start() t2.start() - print("jfjfjfj") for i in range(BASE_TIME + 2): sleep(1) if not p.is_alive(): @@ -113,8 +106,6 @@ def test_correct_exit1(): assert not q.empty() def test_correct_exit2(): - if is_windows(): - return q = Queue() p = Process(target=start_irace, args=(q, scenario2)) p.start()