diff --git a/changelog.txt b/changelog.txt index 3a2810a01..c4994247c 100644 --- a/changelog.txt +++ b/changelog.txt @@ -5,6 +5,16 @@ Changelog The **signac-flow** package follows `semantic versioning `_. The numbers in brackets denote the related GitHub issue and/or pull request. +Version 0.18 + +[0.18.0] -- 2022-xx-xx +---------------------- + +Added ++++++ + +- Feature to install execution hooks for the automated execution of functions with operations (#28, #189, #508). + Version 0.17 ============ diff --git a/doc/api.rst b/doc/api.rst index f330fdb93..6abc2cf22 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -35,6 +35,11 @@ The FlowProject FlowProject.make_group FlowProject.operation FlowProject.operation.with_directives + FlowProject.operation_hooks + FlowProject.operation_hooks.on_fail + FlowProject.operation_hooks.on_finish + FlowProject.operation_hooks.on_start + FlowProject.operation_hooks.on_success FlowProject.operations FlowProject.post FlowProject.post.copy_from @@ -52,6 +57,7 @@ The FlowProject FlowProject.pre.not_ FlowProject.pre.true FlowProject.print_status + FlowProject.project_hooks FlowProject.run FlowProject.scheduler_jobs FlowProject.submit @@ -60,12 +66,22 @@ The FlowProject .. autoclass:: FlowProject :show-inheritance: :members: - :exclude-members: pre,post,operation + :exclude-members: pre,post,operation,operation_hooks .. automethod:: flow.FlowProject.operation(func, name=None) .. automethod:: flow.FlowProject.operation.with_directives(directives, name=None) +.. automethod:: flow.FlowProject.operation_hooks(hook_func, trigger) + +.. automethod:: flow.FlowProject.operation_hooks.on_fail + +.. automethod:: flow.FlowProject.operation_hooks.on_finish + +.. automethod:: flow.FlowProject.operation_hooks.on_start + +.. automethod:: flow.FlowProject.operation_hooks.on_success + .. automethod:: flow.FlowProject.post .. automethod:: flow.FlowProject.post.copy_from diff --git a/flow/__init__.py b/flow/__init__.py index 0b7558b5d..8126c1fd4 100644 --- a/flow/__init__.py +++ b/flow/__init__.py @@ -8,7 +8,7 @@ .. _signac: https://signac.io/ """ -from . import environment, errors, scheduling, testing +from . import environment, errors, hooks, scheduling, testing from .aggregates import aggregator, get_aggregate_id from .environment import get_environment from .operations import cmd, directives, with_job @@ -24,6 +24,7 @@ __all__ = [ "environment", "errors", + "hooks", "scheduling", "testing", "aggregator", diff --git a/flow/hooks/__init__.py b/flow/hooks/__init__.py new file mode 100644 index 000000000..bba7812f1 --- /dev/null +++ b/flow/hooks/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) 2018 The Regents of the University of Michigan +# All rights reserved. +# This software is licensed under the BSD 3-Clause License. +"""Operation hooks.""" +from .hooks import _Hooks + +__all__ = ["_Hooks"] diff --git a/flow/hooks/hooks.py b/flow/hooks/hooks.py new file mode 100644 index 000000000..ba65fc13b --- /dev/null +++ b/flow/hooks/hooks.py @@ -0,0 +1,108 @@ +# Copyright (c) 2018 The Regents of the University of Michigan +# All rights reserved. +# This software is licensed under the BSD 3-Clause License. +"""Operation hooks.""" +import logging + +logger = logging.getLogger(__name__) + + +class _HooksList(list): + def __call__(self, *args, **kwargs): + """Call all hook functions as part of this list.""" + for hook in self: + logger.debug(f"Executing hook function '{hook}'.") + try: + hook(*args, **kwargs) + except Exception as error: + logger.error( + "Error occurred during execution of " + "hook '{}': {}.".format(hook, error) + ) + raise + + +class _Hooks: + """:class:`~._Hooks` execute an action or set of actions at specific stages of operation execution. + + :class:`~._Hooks` can execute a user defined function when an operation + starts, succeeds, fails, or finishes (regardless of whether the operation + executed successfully or failed). + + Hooks can be installed at the operation level as decorators, or on an + instance of :class:`~.FlowProject` through + :meth:`~.FlowProject.project_hooks`. + + Examples + -------- + The example below shows an operation level decorator that prints the + operation name and job id at the start of the operation execution. + + .. code-block:: python + + def start_hook(operation_name, job): + print(f"Starting operation {operation_name} on job {job.id}.") + + @FlowProject.operation + @FlowProject.operation_hook.on_start(start_hook) + def foo(job): + pass + + Parameters + ---------- + on_start : list of callables + Function(s) to execute before the operation begins execution. + on_finish : list of callables + Function(s) to execute after the operation exits, with or without errors. + on_success : list of callables + Function(s) to execute after the operation exits without error. + on_fail : list of callables + Function(s) to execute after the operation exits with an error. + + """ + + _hook_triggers = [ + "on_start", + "on_finish", + "on_success", + "on_fail", + ] + + def __init__(self, *, on_start=None, on_finish=None, on_success=None, on_fail=None): + def set_hooks(self, trigger_name, trigger_value): + if trigger_value is None: + trigger_value = [] + setattr(self, trigger_name, _HooksList(trigger_value)) + + set_hooks(self, "on_start", on_start) + set_hooks(self, "on_finish", on_finish) + set_hooks(self, "on_success", on_success) + set_hooks(self, "on_fail", on_fail) + + def __setattr__(self, name, value): + """Convert to _HooksList when setting a hook trigger attribute.""" + if name in self._hook_triggers: + super().__setattr__(name, _HooksList(value)) + else: + super().__setattr__(name, value) + + def update(self, other): + """Update this instance with hooks from another instance.""" + for hook_trigger in self._hook_triggers: + getattr(self, hook_trigger).extend(getattr(other, hook_trigger)) + + def __str__(self): + """Return a string representation of all hooks.""" + return "{}({})".format( + type(self).__name__, + ", ".join( + f"{hook_trigger}={getattr(self, hook_trigger)}" + for hook_trigger in self._hook_triggers + ), + ) + + def __bool__(self): + """Return True if hooks are defined.""" + return any( + getattr(self, hook_trigger, None) for hook_trigger in self._hook_triggers + ) diff --git a/flow/project.py b/flow/project.py index f1e08c6b0..6312ace3a 100644 --- a/flow/project.py +++ b/flow/project.py @@ -57,6 +57,7 @@ UserConditionError, UserOperationError, ) +from .hooks import _Hooks from .labels import _is_label_func, classlabel, label, staticlabel from .render_status import _render_status from .scheduling.base import ClusterJob, JobStatus @@ -1201,6 +1202,7 @@ def __new__(metacls, name, bases, namespace): cls._OPERATION_FUNCTIONS = [] cls._OPERATION_PRECONDITIONS = defaultdict(list) cls._OPERATION_POSTCONDITIONS = defaultdict(list) + cls._OPERATION_HOOK_REGISTRY = defaultdict(lambda: defaultdict(list)) # All label functions are registered with the label() classmethod, # which is intended to be used as decorator function. The @@ -1225,6 +1227,8 @@ def __new__(metacls, name, bases, namespace): cls._GROUPS = [] cls._GROUP_NAMES = set() + cls.operation_hooks = cls._setup_hooks_object(parent_class=cls) + return cls @staticmethod @@ -1517,6 +1521,90 @@ def add_operation_with_directives(function): return OperationRegister() + @staticmethod + def _setup_hooks_object(parent_class): + class _HooksRegister: + """Add hooks to an operation. + + This object is designed to be used as a decorator. The example + below shows an operation level decorator that prints the + operation name and job id at the start of the operation + execution. + + .. code-block:: python + + def start_hook(operation_name, job): + print(f"Starting operation {operation_name} on job {job.id}.") + + @FlowProject.operation_hook.on_start(start_hook) + @FlowProject.operation + def foo(job): + pass + + A hook is a function that is called at specific points during + the execution of a job operation. In the example above, the + ``start_hook`` hook function is executed before the operation + **foo** runs. Hooks can also run after an operation finishes, + when an operation exits with error, or when an operation exits + without error. + + The available triggers are ``on_start``, ``on_finish``, ``on_fail``, and + ``on_success`` which run when the operation starts, completes, fails, and + succeeds respectively. + + Parameters + ---------- + hook_func : callable + The function that will be executed at a specified point. + trigger : string + The point when a hook operation is executed. + """ + + _parent_class = parent_class + + def __init__(self, hook_func, trigger): + self._hook_func = hook_func + self._hook_trigger = trigger + + @classmethod + def on_start(cls, hook_func): + """Add a hook function triggered before an operation starts.""" + return cls(hook_func, trigger="on_start") + + @classmethod + def on_finish(cls, hook_func): + """Add a hook function triggered after the operation exits. + + The hook is triggered regardless of whether the operation exits + with or without an error. + """ + return cls(hook_func, trigger="on_finish") + + @classmethod + def on_success(cls, hook_func): + """Add a hook function triggered after the operation exits without error.""" + return cls(hook_func, trigger="on_success") + + @classmethod + def on_fail(cls, hook_func): + """Add a hook function triggered after the operation exits with an error.""" + return cls(hook_func, trigger="on_fail") + + def __call__(self, func): + """Add the decorated function to the operation hook registry. + + Parameters + ---------- + func : callable + The operation function associated with the hook function. + """ + self._parent_class._OPERATION_HOOK_REGISTRY[func][ + self._hook_trigger + ].append(self._hook_func) + return func + + return _HooksRegister + def _config_value_as_bool(value): # Function to interpret a configobj bool-like value as a boolean. @@ -1611,6 +1699,10 @@ def __init__(self, config=None, environment=None, entrypoint=None): ) self._template_environment_ = {} + # Setup execution hooks + self._project_hooks = _Hooks() + self._operation_hooks = defaultdict(_Hooks) + # Register all label functions with this project instance. self._label_functions = {} self._register_labels() @@ -1727,6 +1819,25 @@ def _show_template_help_and_exit(self, template_environment, context): ) sys.exit(2) + @property + def project_hooks(self): + """:class:`.hooks.Hooks` defined for all project operations. + + Project-wide hooks are added to an *instance* of the FlowProject, not + the class. For example: + + .. code-block:: python + + def finish_hook(operation_name, job): + print(f"Finished operation {operation_name} on job {job.id}") + + if __name__ == "__main__": + project = FlowProject() + project.project_hooks.on_finish.append(finish_hook) + project.main() + """ + return self._project_hooks + @classmethod def label(cls, label_name_or_func=None): """Designate a function as a label function for this class. @@ -3149,6 +3260,29 @@ def _run_operations_in_parallel(self, pool, operations, progress, timeout): for result in results: result.get(timeout=timeout) + @contextlib.contextmanager + def _run_with_hooks(self, operation): + name = operation.name + jobs = operation._jobs + + # Determine operation hooks + operation_hooks = self._operation_hooks.get(name, _Hooks()) + + self.project_hooks.on_start(name, *jobs) + operation_hooks.on_start(name, *jobs) + try: + yield + except Exception as error: + self.project_hooks.on_fail(name, error, *jobs) + operation_hooks.on_fail(name, error, *jobs) + raise + else: + self.project_hooks.on_success(name, *jobs) + operation_hooks.on_success(name, *jobs) + finally: + self.project_hooks.on_finish(name, *jobs) + operation_hooks.on_finish(name, *jobs) + def _execute_operation(self, operation, timeout=None, pretend=False): """Execute an operation. @@ -3187,7 +3321,8 @@ def _execute_operation(self, operation, timeout=None, pretend=False): operation, operation.cmd, ) - subprocess.run(operation.cmd, shell=True, timeout=timeout, check=True) + with self._run_with_hooks(operation): + subprocess.run(operation.cmd, shell=True, timeout=timeout, check=True) else: # ... executing operation in interpreter process as function: logger.debug( @@ -3196,7 +3331,8 @@ def _execute_operation(self, operation, timeout=None, pretend=False): os.getpid(), ) try: - self._operations[operation.name](*operation._jobs) + with self._run_with_hooks(operation): + self._operations[operation.name](*operation._jobs) except Exception as error: raise UserOperationError( f"An exception was raised during operation {operation.name} " @@ -4253,6 +4389,11 @@ def _register_operations(self): "post": postconditions.get(func, None), } + # Update operation hooks + self._operation_hooks[name].update( + _Hooks(**self._OPERATION_HOOK_REGISTRY[func]) + ) + # Construct FlowOperation: if getattr(func, "_flow_cmd", False): self._operations[name] = FlowCmdOperation(cmd=func, **params) diff --git a/tests/define_hooks_install.py b/tests/define_hooks_install.py new file mode 100644 index 000000000..5ba09f461 --- /dev/null +++ b/tests/define_hooks_install.py @@ -0,0 +1,25 @@ +from define_hooks_test_project import ( + _HooksTestProject, + set_job_doc, + set_job_doc_with_error, +) + + +class ProjectLevelHooks: + keys = ( + "installed_start", + "installed_finish", + "installed_success", + "installed_fail", + ) + + def install_hooks(self, project): + project.project_hooks.on_start.append(set_job_doc(self.keys[0])) + project.project_hooks.on_finish.append(set_job_doc(self.keys[1])) + project.project_hooks.on_success.append(set_job_doc(self.keys[2])) + project.project_hooks.on_fail.append(set_job_doc_with_error(self.keys[3])) + return project + + +if __name__ == "__main__": + ProjectLevelHooks().install_hooks(_HooksTestProject()).main() diff --git a/tests/define_hooks_test_project.py b/tests/define_hooks_test_project.py new file mode 100644 index 000000000..7ce7635e8 --- /dev/null +++ b/tests/define_hooks_test_project.py @@ -0,0 +1,85 @@ +import flow +from flow import FlowProject + +HOOKS_ERROR_MESSAGE = "You raised an error! Hooray!" +HOOK_KEYS = ("start", "finish", "success", "fail") + + +class _HooksTestProject(FlowProject): + pass + + +def set_job_doc(key): + def set_true(operation_name, job): + job.doc[f"{operation_name}_{key}"] = True + + return set_true + + +def set_job_doc_with_error(key=HOOK_KEYS[-1]): + def set_true_with_error(operation_name, error, job): + job.doc[f"{operation_name}_{key}"] = (True, error.args[0]) + + return set_true_with_error + + +def raise_error(operation_name, job): + raise RuntimeError(HOOKS_ERROR_MESSAGE) + + +@_HooksTestProject.operation_hooks.on_start(set_job_doc(HOOK_KEYS[0])) +@_HooksTestProject.operation_hooks.on_finish(set_job_doc(HOOK_KEYS[1])) +@_HooksTestProject.operation_hooks.on_success(set_job_doc(HOOK_KEYS[2])) +@_HooksTestProject.operation_hooks.on_fail(set_job_doc_with_error()) +@_HooksTestProject.operation +def base(job): + if job.sp.raise_exception: + raise RuntimeError(HOOKS_ERROR_MESSAGE) + + +@_HooksTestProject.operation_hooks.on_start(set_job_doc(HOOK_KEYS[0])) +@_HooksTestProject.operation_hooks.on_finish(set_job_doc(HOOK_KEYS[1])) +@_HooksTestProject.operation_hooks.on_success(set_job_doc(HOOK_KEYS[2])) +@_HooksTestProject.operation_hooks.on_fail(set_job_doc_with_error()) +@_HooksTestProject.operation +@flow.with_job +@flow.cmd +def base_cmd(job): + if job.sp.raise_exception: + return "exit 42" + else: + return "touch base_cmd.txt" + + +@_HooksTestProject.operation +def base_no_decorators(job): + if job.sp.raise_exception: + raise RuntimeError(HOOKS_ERROR_MESSAGE) + + +@_HooksTestProject.operation +@flow.with_job +@flow.cmd +def base_cmd_no_decorators(job): + if job.sp.raise_exception: + return "exit 42" + else: + return "touch base_cmd.txt" + + +@_HooksTestProject.operation_hooks.on_start(raise_error) +@_HooksTestProject.operation +def raise_exception_in_hook(job): + pass + + +@_HooksTestProject.operation_hooks.on_start(raise_error) +@_HooksTestProject.operation +@flow.with_job +@flow.cmd +def raise_exception_in_hook_cmd(job): + pass + + +if __name__ == "__main__": + _HooksTestProject().main() diff --git a/tests/template_reference_data.tar.gz b/tests/template_reference_data.tar.gz index ceb821060..96119eb63 100644 Binary files a/tests/template_reference_data.tar.gz and b/tests/template_reference_data.tar.gz differ diff --git a/tests/test_project.py b/tests/test_project.py index b14a6c58b..f5ca8eb45 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -16,6 +16,7 @@ from itertools import groupby from tempfile import TemporaryDirectory +import define_hooks_test_project import pytest import signac from define_aggregate_test_project import _AggregateTestProject @@ -2360,6 +2361,236 @@ def test_main_submit(self, monkeypatch): assert f"exec agg_op3 {get_aggregate_id(project)}" in submit_output +class TestHooksSetUp(TestProjectBase): + error_message = define_hooks_test_project.HOOKS_ERROR_MESSAGE + keys = ["start", "finish", "success", "fail"] + project_class = define_hooks_test_project._HooksTestProject + entrypoint = dict( + path=os.path.realpath( + os.path.join(os.path.dirname(__file__), "define_hooks_test_project.py") + ) + ) + + @staticmethod + def _get_job_doc_key(job, operation_name): + return lambda key: job.doc.get(f"{operation_name}_{key}") + + @pytest.fixture(params=["base"]) + def operation_name(self, request): + return request.param + + def mock_project(self): + project = self.project_class.get_project(root=self._tmp_dir.name) + project.open_job(dict(raise_exception=False)).init() + project.open_job(dict(raise_exception=True)).init() + project = project.get_project(root=self._tmp_dir.name) + project._entrypoint = self.entrypoint + return project + + def call_subcmd(self, subcmd, stderr=subprocess.DEVNULL): + # Bypass raising the error/checking output since it interferes with hook.on_fail + fn_script = self.entrypoint["path"] + _cmd = f"python {fn_script} {subcmd} --debug" + with add_path_to_environment_pythonpath(os.path.abspath(self.cwd)): + try: + with switch_to_directory(self.project.root_directory()): + return subprocess.check_output(_cmd.split(), stderr=stderr) + except subprocess.CalledProcessError as error: + print(error, file=sys.stderr) + print(error.output, file=sys.stderr) + raise + + @pytest.fixture(scope="function") + def project(self): + return self.mock_project() + + @pytest.fixture(params=[True, False], ids=["raise_exception", "no_exception"]) + def job(self, request, project): + return project.open_job(dict(raise_exception=request.param)) + + +class TestHooksBase(TestHooksSetUp): + def test_start_and_finish(self, project, job, operation_name): + get_job_doc_value = self._get_job_doc_key(job, operation_name) + + assert get_job_doc_value(self.keys[0]) is None + assert get_job_doc_value(self.keys[1]) is None + + if job.sp.raise_exception: + with pytest.raises(subprocess.CalledProcessError): + self.call_subcmd(f"run -o {operation_name} -j {job.id}") + else: + self.call_subcmd(f"run -o {operation_name} -j {job.id}") + + assert get_job_doc_value(self.keys[0]) + assert get_job_doc_value(self.keys[1]) + + def test_success(self, project, job, operation_name): + get_job_doc_value = self._get_job_doc_key(job, operation_name) + + assert get_job_doc_value(self.keys[2]) is None + + if job.sp.raise_exception: + with pytest.raises(subprocess.CalledProcessError): + self.call_subcmd(f"run -o {operation_name} -j {job.id}") + else: + self.call_subcmd(f"run -o {operation_name} -j {job.id}") + + if job.sp.raise_exception: + assert not get_job_doc_value(self.keys[2]) + else: + assert get_job_doc_value(self.keys[2]) + + def test_fail(self, project, job, operation_name): + get_job_doc_value = self._get_job_doc_key(job, operation_name) + + assert get_job_doc_value(self.keys[3]) is None + + if job.sp.raise_exception: + with pytest.raises(subprocess.CalledProcessError): + self.call_subcmd(f"run -o {operation_name} -j {job.id}") + else: + self.call_subcmd(f"run -o {operation_name} -j {job.id}") + + if job.sp.raise_exception: + assert get_job_doc_value(self.keys[3])[0] + assert get_job_doc_value(self.keys[3])[1] == self.error_message + else: + assert get_job_doc_value(self.keys[3]) is None + + +class TestHooksCmd(TestHooksBase): + # Tests hook decorators for a job operation with the @cmd decorator + error_message = 42 + + @pytest.fixture(params=["base_cmd"]) + def operation_name(self, request): + return request.param + + +class TestHooksInstallSetUp(TestHooksSetUp): + entrypoint = dict( + path=os.path.realpath( + os.path.join(os.path.dirname(__file__), "define_hooks_install.py") + ) + ) + + +class TestHooksInstallBase(TestHooksBase, TestHooksInstallSetUp): + # Tests project-wide hooks on job operations with and without operation level hooks + + # Check job document for keys from installed, project-wide hooks + keys = [ + "installed_start", + "installed_finish", + "installed_success", + "installed_fail", + ] + + @pytest.fixture(params=["base", "base_no_decorators"]) + def operation_name(self, request): + return request.param + + +class TestHooksInstallCmd(TestHooksCmd, TestHooksInstallSetUp): + # Tests project-wide hooks on job operations with the @cmd decorator. + # Job operations are with or without operation level hooks + + # Check job document for keys from installed, project-wide hooks + keys = [ + "installed_start", + "installed_finish", + "installed_success", + "installed_fail", + ] + + @pytest.fixture(params=["base_cmd", "base_cmd_no_decorators"]) + def operation_name(self, request): + return request.param + + +class TestHooksInstallWithDecorators(TestHooksBase, TestHooksInstallSetUp): + # Tests if project-wide hooks interfere with operation level hooks + @pytest.fixture(params=["base"]) + def operation_name(self, request): + return request.param + + +class TestHooksInstallCmdWithDecorators(TestHooksCmd, TestHooksInstallSetUp): + # Tests if project-wide hooks interfere with operation level hooks + # in job operations with the @cmd decorator + @pytest.fixture() + def operation_name(self): + return "base_cmd" + + +class TestHooksInstallNoDecorators(TestHooksInstallSetUp): + # Tests if operation level hooks interfere with project-level hooks + @pytest.fixture(params=["base_no_decorators", "base_cmd_no_decorators"]) + def operation_name(self, request): + return request.param + + @pytest.fixture() + def job(self, project): + return project.open_job(dict(raise_exception=False)) + + def test_no_decorator_keys(self, operation_name, job): + get_job_doc_key = self._get_job_doc_key(job, operation_name) + self.call_subcmd(f"run -o {operation_name} -j {job.id}") + for key in self.keys: + assert get_job_doc_key(key) is None + + +class TestHooksInvalidOption(TestHooksSetUp): + def call_subcmd(self, subcmd, stderr=subprocess.STDOUT): + # Return error as output instead of raising error + fn_script = self.entrypoint["path"] + _cmd = f"python {fn_script} {subcmd} --debug" + with add_path_to_environment_pythonpath(os.path.abspath(self.cwd)): + try: + with switch_to_directory(self.project.root_directory()): + return subprocess.check_output(_cmd.split(), stderr=stderr) + except subprocess.CalledProcessError as error: + return str(error.output) + + def test_invalid_hook(self): + class A(FlowProject): + pass + + with pytest.raises(AttributeError): + + @A.operation_hooks.invalid_option(lambda operation_name, job: None) + @A.operation + def test_invalid_decorators(_): + pass + + def test_install_invalid_hook(self): + class InstallInvalidHook: + def install_hook(self, project): + project.project_hooks.invalid_option.append( + lambda operation_name, job: None + ) + + with pytest.raises(AttributeError): + InstallInvalidHook().install_hook(self.mock_project()) + + def test_raise_exception_in_hook(self): + job = self.mock_project().open_job(dict(raise_exception=False)) + + error_output = self.call_subcmd(f"run -o raise_exception_in_hook -j {job.id}") + + assert "RuntimeError" in error_output + + def test_raise_exception_in_hook_cmd(self): + job = self.mock_project().open_job(dict(raise_exception=False)) + + error_output = self.call_subcmd( + f"run -o raise_exception_in_hook_cmd -j {job.id}" + ) + + assert "RuntimeError" in error_output + + class TestIgnoreConditions: def test_str(self): expected_results = {