From 5bb4d008873803df70c8ff1ea0937c5a06509842 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Wed, 14 Mar 2018 16:14:54 +0100 Subject: [PATCH 1/5] owneuralnetwork: execution in a threat and progress bar The scikit-learn class in the background does not support callbacks, so I made its verbose output the callback. --- Orange/widgets/model/owneuralnetwork.py | 159 +++++++++++++++++++++++- 1 file changed, 157 insertions(+), 2 deletions(-) diff --git a/Orange/widgets/model/owneuralnetwork.py b/Orange/widgets/model/owneuralnetwork.py index 34a8444b8a4..667bffe94e6 100644 --- a/Orange/widgets/model/owneuralnetwork.py +++ b/Orange/widgets/model/owneuralnetwork.py @@ -1,8 +1,13 @@ +from functools import partial +import logging import re import sys +from unittest.mock import patch +import concurrent.futures -from AnyQt.QtWidgets import QApplication -from AnyQt.QtCore import Qt +from AnyQt.QtWidgets import QApplication, qApp +from AnyQt.QtCore import Qt, QThread +from AnyQt.QtCore import pyqtSlot as Slot from Orange.data import Table from Orange.modelling import NNLearner @@ -10,6 +15,35 @@ from Orange.widgets.settings import Setting from Orange.widgets.utils.owlearnerwidget import OWBaseLearner +from Orange.widgets.utils.concurrent import ( + ThreadExecutor, FutureWatcher, methodinvoke +) + + + +class Task: + """ + A class that will hold the state for an learner evaluation. + """ + future = ... # type: concurrent.futures.Future + watcher = ... # type: FutureWatcher + cancelled = False # type: bool + + def cancel(self): + """ + Cancel the task. + + Set the `cancelled` field to True and block until the future is done. + """ + # set cancelled state + self.cancelled = True + self.future.cancel() + concurrent.futures.wait([self.future]) + + +class CancelThreadException(BaseException): + pass + class OWNNLearner(OWBaseLearner): name = "Neural Network" @@ -58,6 +92,15 @@ def add_main_layout(self): alignment=Qt.AlignRight, callback=self.settings_changed, controlWidth=80) + def setup_layout(self): + super().setup_layout() + + self._task = None # type: Optional[Task] + self._executor = ThreadExecutor() + + # just a test cancel button + gui.button(self.controlArea, self, "Cancel", callback=self.cancel) + def create_learner(self): return self.LEARNER( hidden_layer_sizes=self.get_hidden_layers(), @@ -81,6 +124,118 @@ def get_hidden_layers(self): self.hidden_layers_edit.setText("100,") return layers + def update_model(self): + self.show_fitting_failed(None) + self.model = None + if self.check_data(): + self.__update() + else: + self.Outputs.model.send(self.model) + + @Slot(float) + def setProgressValue(self, value): + assert self.thread() is QThread.currentThread() + self.progressBarSet(value) + + def __update(self): + if self._task is not None: + # First make sure any pending tasks are cancelled. + self.cancel() + assert self._task is None + + self.setBlocking(True) + + self._task = task = Task() + + # A thread safe way to invoke a method + set_progress = methodinvoke(self, "setProgressValue", (float,)) + + max_iter = self.learner.kwargs["max_iter"] + + def callback(iteration=None): + if task.cancelled: + raise CancelThreadException() # this stop the thread + if iteration is not None: + set_progress(iteration/max_iter*100) + + def print_callback(*args, **kwargs): + iters = None + # try to parse iteration number + if args and args[0] and isinstance(args[0], str): + find = re.findall(r"Iteration (\d+)", args[0]) + if find: + iters = int(find[0]) + callback(iters) + + def build_model(data, learner): + if learner.kwargs["solver"] != "lbfgs": + # enable verbose printouts within scikit and redirect them + with patch.dict(learner.kwargs, {"verbose": True}),\ + patch("builtins.print", print_callback): + return learner(data) + else: + # lbfgs solver uses different mechanism + return learner(data) + + build_model_func = partial(build_model, self.data, self.learner) + + self.progressBarInit() + + task.future = self._executor.submit(build_model_func) + task.watcher = FutureWatcher(task.future) + task.watcher.done.connect(self._task_finished) + + @Slot(concurrent.futures.Future) + def _task_finished(self, f): + """ + Parameters + ---------- + f : Future + The future instance holding the built model + """ + assert self.thread() is QThread.currentThread() + assert self._task is not None + assert self._task.future is f + assert f.done() + + self.setBlocking(False) + + self._task = None + self.progressBarFinished() + + try: + self.model = f.result() + except Exception as ex: # pylint: disable=broad-except + # Log the exception with a traceback + log = logging.getLogger() + log.exception(__name__, exc_info=True) + self.model = None + self.show_fitting_failed(ex) + else: + self.model.name = self.learner_name + self.model.instances = self.data + self.Outputs.model.send(self.model) + + def cancel(self): + """ + Cancel the current task (if any). + """ + if self._task is not None: + self._task.cancel() + assert self._task.future.done() + # disconnect the `_task_finished` slot + self._task.watcher.done.disconnect(self._task_finished) + self._task = None + # threads use signals to run functions in the main thread and some + # can still be quoued (perhaps change) + qApp.processEvents() + self.progressBarFinished() + self.setBlocking(False) + + def onDeleteWidget(self): + self.cancel() + super().onDeleteWidget() + if __name__ == "__main__": a = QApplication(sys.argv) From 677e6f8e1375eb584b5e5758978fbc3dc5c41fcf Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Thu, 15 Mar 2018 11:02:31 +0100 Subject: [PATCH 2/5] Modelling widget tests support asynchronic excution --- Orange/widgets/tests/base.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Orange/widgets/tests/base.py b/Orange/widgets/tests/base.py index a2898526774..0c3eef8ba6b 100644 --- a/Orange/widgets/tests/base.py +++ b/Orange/widgets/tests/base.py @@ -514,13 +514,16 @@ def test_input_data(self): self.assertEqual(self.widget.data, None) self.send_signal("Data", self.data) self.assertEqual(self.widget.data, self.data) + self.wait_until_stop_blocking() def test_input_data_disconnect(self): """Check widget's data and model after disconnecting data from input""" self.send_signal("Data", self.data) self.assertEqual(self.widget.data, self.data) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.send_signal("Data", None) + self.wait_until_stop_blocking() self.assertEqual(self.widget.data, None) self.assertIsNone(self.get_output(self.widget.Outputs.model)) @@ -529,9 +532,11 @@ def test_input_data_learner_adequacy(self): for inadequate in self.inadequate_dataset: self.send_signal("Data", inadequate) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.assertTrue(self.widget.Error.data_error.is_shown()) for valid in self.valid_datasets: self.send_signal("Data", valid) + self.wait_until_stop_blocking() self.assertFalse(self.widget.Error.data_error.is_shown()) def test_input_preprocessor(self): @@ -542,6 +547,7 @@ def test_input_preprocessor(self): randomize, self.widget.preprocessors, 'Preprocessor not added to widget preprocessors') self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.assertEqual( (randomize,), self.widget.learner.preprocessors, 'Preprocessors were not passed to the learner') @@ -551,6 +557,7 @@ def test_input_preprocessors(self): pp_list = PreprocessorList([Randomize(), RemoveNaNColumns()]) self.send_signal("Preprocessor", pp_list) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.assertEqual( (pp_list,), self.widget.learner.preprocessors, '`PreprocessorList` was not added to preprocessors') @@ -560,10 +567,12 @@ def test_input_preprocessor_disconnect(self): randomize = Randomize() self.send_signal("Preprocessor", randomize) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.assertEqual(randomize, self.widget.preprocessors) self.send_signal("Preprocessor", None) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.assertIsNone(self.widget.preprocessors, 'Preprocessors not removed on disconnect.') @@ -585,6 +594,7 @@ def test_output_model(self): self.assertIsNone(self.get_output(self.widget.Outputs.model)) self.send_signal('Data', self.data) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() model = self.get_output(self.widget.Outputs.model) self.assertIsNotNone(model) self.assertIsInstance(model, self.widget.LEARNER.__returns__) @@ -598,6 +608,7 @@ def test_output_learner_name(self): self.widget.name_line_edit.text()) self.widget.name_line_edit.setText(new_name) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.assertEqual(self.get_output("Learner").name, new_name) def test_output_model_name(self): @@ -606,6 +617,7 @@ def test_output_model_name(self): self.widget.name_line_edit.setText(new_name) self.send_signal("Data", self.data) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() self.assertEqual(self.get_output(self.widget.Outputs.model).name, new_name) def _get_param_value(self, learner, param): @@ -626,6 +638,7 @@ def test_parameters_default(self): for dataset in self.valid_datasets: self.send_signal("Data", dataset) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() for parameter in self.parameters: # Skip if the param isn't used for the given data type if self._should_check_parameter(parameter, dataset): @@ -639,6 +652,7 @@ def test_parameters(self): # to only certain problem types for dataset in self.valid_datasets: self.send_signal("Data", dataset) + self.wait_until_stop_blocking() for parameter in self.parameters: # Skip if the param isn't used for the given data type @@ -650,6 +664,7 @@ def test_parameters(self): for value in parameter.values: parameter.set_value(value) self.widget.apply_button.button.click() + self.wait_until_stop_blocking() param = self._get_param_value(self.widget.learner, parameter) self.assertEqual( param, parameter.get_value(), @@ -674,6 +689,7 @@ def test_params_trigger_settings_changed(self): """Check that the learner gets updated whenever a param is changed.""" for dataset in self.valid_datasets: self.send_signal("Data", dataset) + self.wait_until_stop_blocking() for parameter in self.parameters: # Skip if the param isn't used for the given data type From 3fc7292dd2b7d83f84cdeead5e67f414d55b2e17 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Thu, 15 Mar 2018 12:49:46 +0100 Subject: [PATCH 3/5] owneuralnetwork: raise max interations limit --- Orange/widgets/model/owneuralnetwork.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Orange/widgets/model/owneuralnetwork.py b/Orange/widgets/model/owneuralnetwork.py index 667bffe94e6..6ed445b95a0 100644 --- a/Orange/widgets/model/owneuralnetwork.py +++ b/Orange/widgets/model/owneuralnetwork.py @@ -87,7 +87,7 @@ def add_main_layout(self): label="Alpha:", decimals=5, alignment=Qt.AlignRight, callback=self.settings_changed, controlWidth=80) self.max_iter_spin = gui.spin( - box, self, "max_iterations", 10, 300, step=10, + box, self, "max_iterations", 10, 10000, step=10, label="Max iterations:", orientation=Qt.Horizontal, alignment=Qt.AlignRight, callback=self.settings_changed, controlWidth=80) From e7c89a2c4af18e46c985f43d30a55eb648c32891 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Mon, 19 Mar 2018 11:49:57 +0100 Subject: [PATCH 4/5] owneuralnetwork: connect callbacks to NN through n_iters_ MLPClassifier n_iters_ was made a property which calls a callback. --- Orange/base.py | 5 +++- Orange/classification/neural_network.py | 25 +++++++++++++++++- Orange/modelling/neural_network.py | 7 +++++ Orange/regression/neural_network.py | 12 ++++++++- Orange/widgets/model/owneuralnetwork.py | 35 ++++++++----------------- 5 files changed, 57 insertions(+), 27 deletions(-) diff --git a/Orange/base.py b/Orange/base.py index d71cbc9eea3..845d881c5cb 100644 --- a/Orange/base.py +++ b/Orange/base.py @@ -383,8 +383,11 @@ def __call__(self, data): m.params = self.params return m + def _initialize_wrapped(self): + return self.__wraps__(**self.params) + def fit(self, X, Y, W=None): - clf = self.__wraps__(**self.params) + clf = self._initialize_wrapped() Y = Y.reshape(-1) if W is None or not self.supports_weights: return self.__returns__(clf.fit(X, Y)) diff --git a/Orange/classification/neural_network.py b/Orange/classification/neural_network.py index d3720a7bcdd..53dff79bed4 100644 --- a/Orange/classification/neural_network.py +++ b/Orange/classification/neural_network.py @@ -5,5 +5,28 @@ __all__ = ["NNClassificationLearner"] +class NIterCallbackMixin: + orange_callback = None + + @property + def n_iter_(self): + return self.__orange_n_iter + + @n_iter_.setter + def n_iter_(self, v): + self.__orange_n_iter = v + if self.orange_callback: + self.orange_callback(v) + + +class MLPClassifierWCallback(skl_nn.MLPClassifier, NIterCallbackMixin): + pass + + class NNClassificationLearner(NNBase, SklLearner): - __wraps__ = skl_nn.MLPClassifier + __wraps__ = MLPClassifierWCallback + + def _initialize_wrapped(self): + clf = SklLearner._initialize_wrapped(self) + clf.orange_callback = getattr(self, "callback", None) + return clf diff --git a/Orange/modelling/neural_network.py b/Orange/modelling/neural_network.py index 2e988034445..91be5e2c2e4 100644 --- a/Orange/modelling/neural_network.py +++ b/Orange/modelling/neural_network.py @@ -8,3 +8,10 @@ class NNLearner(SklFitter): __fits__ = {'classification': NNClassificationLearner, 'regression': NNRegressionLearner} + + callback = None + + def get_learner(self, problem_type): + learner = super().get_learner(problem_type) + learner.callback = self.callback + return learner diff --git a/Orange/regression/neural_network.py b/Orange/regression/neural_network.py index a8253568879..7a8b553756d 100644 --- a/Orange/regression/neural_network.py +++ b/Orange/regression/neural_network.py @@ -1,9 +1,19 @@ import sklearn.neural_network as skl_nn from Orange.base import NNBase from Orange.regression import SklLearner +from Orange.classification.neural_network import NIterCallbackMixin __all__ = ["NNRegressionLearner"] +class MLPRegressorWCallback(skl_nn.MLPRegressor, NIterCallbackMixin): + pass + + class NNRegressionLearner(NNBase, SklLearner): - __wraps__ = skl_nn.MLPRegressor + __wraps__ = MLPRegressorWCallback + + def _initialize_wrapped(self): + clf = SklLearner._initialize_wrapped(self) + clf.orange_callback = getattr(self, "callback", None) + return clf diff --git a/Orange/widgets/model/owneuralnetwork.py b/Orange/widgets/model/owneuralnetwork.py index 6ed445b95a0..a5568e452ff 100644 --- a/Orange/widgets/model/owneuralnetwork.py +++ b/Orange/widgets/model/owneuralnetwork.py @@ -1,8 +1,8 @@ from functools import partial +import copy import logging import re import sys -from unittest.mock import patch import concurrent.futures from AnyQt.QtWidgets import QApplication, qApp @@ -20,7 +20,6 @@ ) - class Task: """ A class that will hold the state for an learner evaluation. @@ -152,32 +151,20 @@ def __update(self): max_iter = self.learner.kwargs["max_iter"] - def callback(iteration=None): + def callback(iteration): if task.cancelled: raise CancelThreadException() # this stop the thread - if iteration is not None: - set_progress(iteration/max_iter*100) - - def print_callback(*args, **kwargs): - iters = None - # try to parse iteration number - if args and args[0] and isinstance(args[0], str): - find = re.findall(r"Iteration (\d+)", args[0]) - if find: - iters = int(find[0]) - callback(iters) + set_progress(iteration/max_iter*100) + + # copy to set the callback so that the learner output is not modified + # (currently we can not pass callbacks to learners __call__) + learner = copy.copy(self.learner) + learner.callback = callback def build_model(data, learner): - if learner.kwargs["solver"] != "lbfgs": - # enable verbose printouts within scikit and redirect them - with patch.dict(learner.kwargs, {"verbose": True}),\ - patch("builtins.print", print_callback): - return learner(data) - else: - # lbfgs solver uses different mechanism - return learner(data) - - build_model_func = partial(build_model, self.data, self.learner) + return learner(data) + + build_model_func = partial(build_model, self.data, learner) self.progressBarInit() From 48bb3bc6a8264fada3c498005da57b3d8cba64bb Mon Sep 17 00:00:00 2001 From: Ales Erjavec Date: Tue, 20 Mar 2018 16:32:18 +0100 Subject: [PATCH 5/5] owneuralnetwork: Atomic disconnect from task state update notifiers --- Orange/widgets/model/owneuralnetwork.py | 86 +++++++++++++++---------- 1 file changed, 53 insertions(+), 33 deletions(-) diff --git a/Orange/widgets/model/owneuralnetwork.py b/Orange/widgets/model/owneuralnetwork.py index a5568e452ff..41ddd3795f6 100644 --- a/Orange/widgets/model/owneuralnetwork.py +++ b/Orange/widgets/model/owneuralnetwork.py @@ -5,9 +5,9 @@ import sys import concurrent.futures -from AnyQt.QtWidgets import QApplication, qApp -from AnyQt.QtCore import Qt, QThread -from AnyQt.QtCore import pyqtSlot as Slot +from AnyQt.QtWidgets import QApplication +from AnyQt.QtCore import Qt, QThread, QObject +from AnyQt.QtCore import pyqtSlot as Slot, pyqtSignal as Signal from Orange.data import Table from Orange.modelling import NNLearner @@ -15,19 +15,27 @@ from Orange.widgets.settings import Setting from Orange.widgets.utils.owlearnerwidget import OWBaseLearner -from Orange.widgets.utils.concurrent import ( - ThreadExecutor, FutureWatcher, methodinvoke -) +from Orange.widgets.utils.concurrent import ThreadExecutor, FutureWatcher -class Task: +class Task(QObject): """ A class that will hold the state for an learner evaluation. """ - future = ... # type: concurrent.futures.Future - watcher = ... # type: FutureWatcher + done = Signal(object) + progressChanged = Signal(float) + + future = None # type: concurrent.futures.Future + watcher = None # type: FutureWatcher cancelled = False # type: bool + def setFuture(self, future): + if self.future is not None: + raise RuntimeError("future is already set") + self.future = future + self.watcher = FutureWatcher(future, parent=self) + self.watcher.done.connect(self.done) + def cancel(self): """ Cancel the task. @@ -39,8 +47,14 @@ def cancel(self): self.future.cancel() concurrent.futures.wait([self.future]) + def emitProgressUpdate(self, value): + self.progressChanged.emit(value) + + def isInterruptionRequested(self): + return self.cancelled -class CancelThreadException(BaseException): + +class CancelTaskException(BaseException): pass @@ -142,19 +156,21 @@ def __update(self): self.cancel() assert self._task is None - self.setBlocking(True) - - self._task = task = Task() - - # A thread safe way to invoke a method - set_progress = methodinvoke(self, "setProgressValue", (float,)) - max_iter = self.learner.kwargs["max_iter"] + # Setup the task state + task = Task() + lastemitted = 0. + def callback(iteration): - if task.cancelled: - raise CancelThreadException() # this stop the thread - set_progress(iteration/max_iter*100) + nonlocal task # type: Task + nonlocal lastemitted + if task.isInterruptionRequested(): + raise CancelTaskException() + progress = round(iteration / max_iter * 100) + if progress != lastemitted: + task.emitProgressUpdate(progress) + lastemitted = progress # copy to set the callback so that the learner output is not modified # (currently we can not pass callbacks to learners __call__) @@ -162,15 +178,20 @@ def callback(iteration): learner.callback = callback def build_model(data, learner): - return learner(data) + try: + return learner(data) + except CancelTaskException: + return None build_model_func = partial(build_model, self.data, learner) - self.progressBarInit() + task.setFuture(self._executor.submit(build_model_func)) + task.done.connect(self._task_finished) + task.progressChanged.connect(self.setProgressValue) - task.future = self._executor.submit(build_model_func) - task.watcher = FutureWatcher(task.future) - task.watcher.done.connect(self._task_finished) + self._task = task + self.progressBarInit() + self.setBlocking(True) @Slot(concurrent.futures.Future) def _task_finished(self, f): @@ -184,10 +205,9 @@ def _task_finished(self, f): assert self._task is not None assert self._task.future is f assert f.done() - - self.setBlocking(False) - + self._task.deleteLater() self._task = None + self.setBlocking(False) self.progressBarFinished() try: @@ -210,12 +230,12 @@ def cancel(self): if self._task is not None: self._task.cancel() assert self._task.future.done() - # disconnect the `_task_finished` slot - self._task.watcher.done.disconnect(self._task_finished) + # disconnect from the task + self._task.done.disconnect(self._task_finished) + self._task.progressChanged.disconnect(self.setProgressValue) + self._task.deleteLater() self._task = None - # threads use signals to run functions in the main thread and some - # can still be quoued (perhaps change) - qApp.processEvents() + self.progressBarFinished() self.setBlocking(False)