diff --git a/docs/source/howto/run_codes.rst b/docs/source/howto/run_codes.rst index 74203b4a77..26d2c72e3d 100644 --- a/docs/source/howto/run_codes.rst +++ b/docs/source/howto/run_codes.rst @@ -585,6 +585,33 @@ Finally, the monitor needs to be declared using an entry point in the ``aiida.ca The next section will show how this entry point is used to assign it to a calculation job. +.. versionadded:: 2.5.0 + +Monitors can also attach outputs to the calculation that it is monitoring. +This can be useful to report outputs while the calculation is running that should still be stored permanently in the provenance graph. +In the following example, a simple monitor is implemented that returns a ``CalcJobMonitorResult`` that defines a dictionary of output nodes: + +.. code-block:: python + + from aiida.orm import CalcJobNode + from aiida.transports import Transport + + def monitor(node: CalcJobNode, transport: Transport) -> CalcJobMonitorResult: + """Return a dictionary of output nodes to be attached to the calculation to which the monitor is attached.""" + import secrets + return CalcJobMonitorResult( + outputs={ + 'some_output': Int(2), + 'messages': { + f'key_{secrets.token_hex(4)}': Str('some random message') + } + } + ) + +Once the monitor returns, the engine will loop over the nodes in the ``outputs`` dictionary and attach them to the calculation node. +Note that the ``CalcJob`` class of course needs to specify this output port in the output namespace, otherwise an exception is raised. + + How to assign a monitor ----------------------- diff --git a/src/aiida/engine/processes/calcjobs/monitors.py b/src/aiida/engine/processes/calcjobs/monitors.py index 8b5fa6f67d..56af82c711 100644 --- a/src/aiida/engine/processes/calcjobs/monitors.py +++ b/src/aiida/engine/processes/calcjobs/monitors.py @@ -49,6 +49,9 @@ class CalcJobMonitorResult: override_exit_code: bool = True """If set to ``False``, the engine will keep the exit code returned by the parser.""" + outputs: dict[str, t.Any] | None = None + """Optional dictionary of output nodes to be attached to the process.""" + def __post_init__(self): """Validate the attributes.""" self.validate() diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 715bfbaef2..94c0895b80 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -593,6 +593,11 @@ async def _monitor_job(self, node, transport_queue, monitors) -> CalcJobMonitorR monitor_result = await self._launch_task(task_monitor_job, node, transport_queue, monitors=monitors) + if monitor_result and monitor_result.outputs: + for label, output in monitor_result.outputs.items(): + self.process.out(label, output) + self.process.update_outputs() + if monitor_result and monitor_result.action == CalcJobMonitorAction.DISABLE_SELF: monitors.monitors[monitor_result.key].disabled = True diff --git a/tests/engine/processes/calcjobs/test_monitors.py b/tests/engine/processes/calcjobs/test_monitors.py index 1adeca6f16..0593b776e2 100644 --- a/tests/engine/processes/calcjobs/test_monitors.py +++ b/tests/engine/processes/calcjobs/test_monitors.py @@ -1,7 +1,10 @@ """Tests for the :mod:`aiida.engine.processes.calcjobs.monitors` module.""" +from __future__ import annotations + import time import pytest +from aiida.calculations.arithmetic.add import ArithmeticAddCalculation from aiida.calculations.monitors import base from aiida.common.exceptions import EntryPointError from aiida.engine import run_get_node @@ -11,7 +14,33 @@ CalcJobMonitorResult, CalcJobMonitors, ) -from aiida.orm import Dict, Int +from aiida.orm import Dict, Int, Str + + +class StoreMessageCalculation(ArithmeticAddCalculation): + """Subclass of ``ArithmeticAddCalculation`` that just adds the ``messages`` output namespace.""" + + @classmethod + def define(cls, spec): + """Define the process specification, including its inputs, outputs and known exit codes. + + :param spec: the calculation job process spec to define. + """ + super().define(spec) + spec.output_namespace('messages', valid_type=Str) + + +def monitor_store_message(node, transport, **kwargs): # pylint: disable=unused-argument + """Test monitor that returns an output node.""" + import datetime + import secrets + + return CalcJobMonitorResult( + action=CalcJobMonitorAction.DISABLE_ALL, + outputs={ + 'messages': {'datetime_' + datetime.datetime.now().strftime('%Y%m%d_%H%M%S'): Str(secrets.token_hex(15))} + }, + ) def test_calc_job_monitor_result_constructor_invalid(): @@ -175,3 +204,24 @@ def test_calc_job_monitors_process_poll_interval_integrated(entry_points, aiida_ # Check that the number of log messages emitted by the monitor is just 1 as it should have been called just once. logs = [rec.message for rec in aiida_caplog.records if rec.message == 'monitor_emit_warning monitor was called'] assert len(logs) == 1 + + +def test_calc_job_monitors_outputs(entry_points, aiida_local_code_factory): + """Test a monitor that returns outputs to be attached to the node.""" + entry_points.add(StoreMessageCalculation, 'aiida.calculations:core.store_message') + entry_points.add(monitor_store_message, 'aiida.calculations.monitors:core.store_message') + + code = aiida_local_code_factory('core.store_message', '/bin/bash') + builder = code.get_builder() + builder.x = Int(1) + builder.y = Int(1) + builder.monitors = {'store_message': Dict({'entry_point': 'core.store_message', 'minimum_poll_interval': 1})} + builder.metadata = {'options': {'sleep': 3, 'resources': {'num_machines': 1}}} + + _, node = run_get_node(builder) + assert node.is_finished_ok + assert 'messages' in node.outputs + assert len(node.outputs.messages) + for message in node.outputs.messages.values(): + assert isinstance(message, Str) + assert len(message.value) == 30, message.value