Skip to content

Commit

Permalink
Engine: Allow CalcJob monitors to return outputs
Browse files Browse the repository at this point in the history
The `CalcJobMonitorResult` dataclass adds the attribute `outputs`. It
takes a dictionary of nodes that the engine will attach to the
calculation node to which the monitor is attached just as the outputs
of a `Parser` would be.
  • Loading branch information
sphuber committed Feb 2, 2024
1 parent 28adaca commit b7e59a0
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 1 deletion.
27 changes: 27 additions & 0 deletions docs/source/howto/run_codes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,33 @@ Finally, the monitor needs to be declared using an entry point in the ``aiida.ca
The next section will show how this entry point is used to assign it to a calculation job.


.. versionadded:: 2.5.0

Monitors can also attach outputs to the calculation that it is monitoring.
This can be useful to report outputs while the calculation is running that should still be stored permanently in the provenance graph.
In the following example, a simple monitor is implemented that returns a ``CalcJobMonitorResult`` that defines a dictionary of output nodes:

.. code-block:: python
from aiida.orm import CalcJobNode
from aiida.transports import Transport
def monitor(node: CalcJobNode, transport: Transport) -> CalcJobMonitorResult:
"""Return a dictionary of output nodes to be attached to the calculation to which the monitor is attached."""
import secrets
return CalcJobMonitorResult(
outputs={
'some_output': Int(2),
'messages': {
f'key_{secrets.token_hex(4)}': Str('some random message')
}
}
)
Once the monitor returns, the engine will loop over the nodes in the ``outputs`` dictionary and attach them to the calculation node.
Note that the ``CalcJob`` class of course needs to specify this output port in the output namespace, otherwise an exception is raised.


How to assign a monitor
-----------------------

Expand Down
3 changes: 3 additions & 0 deletions src/aiida/engine/processes/calcjobs/monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class CalcJobMonitorResult:
override_exit_code: bool = True
"""If set to ``False``, the engine will keep the exit code returned by the parser."""

outputs: dict[str, t.Any] | None = None
"""Optional dictionary of output nodes to be attached to the process."""

def __post_init__(self):
"""Validate the attributes."""
self.validate()
Expand Down
5 changes: 5 additions & 0 deletions src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,11 @@ async def _monitor_job(self, node, transport_queue, monitors) -> CalcJobMonitorR

monitor_result = await self._launch_task(task_monitor_job, node, transport_queue, monitors=monitors)

if monitor_result and monitor_result.outputs:
for label, output in monitor_result.outputs.items():
self.process.out(label, output)
self.process.update_outputs()

if monitor_result and monitor_result.action == CalcJobMonitorAction.DISABLE_SELF:
monitors.monitors[monitor_result.key].disabled = True

Expand Down
52 changes: 51 additions & 1 deletion tests/engine/processes/calcjobs/test_monitors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Tests for the :mod:`aiida.engine.processes.calcjobs.monitors` module."""
from __future__ import annotations

import time

import pytest
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
from aiida.calculations.monitors import base
from aiida.common.exceptions import EntryPointError
from aiida.engine import run_get_node
Expand All @@ -11,7 +14,33 @@
CalcJobMonitorResult,
CalcJobMonitors,
)
from aiida.orm import Dict, Int
from aiida.orm import Dict, Int, Str


class StoreMessageCalculation(ArithmeticAddCalculation):
"""Subclass of ``ArithmeticAddCalculation`` that just adds the ``messages`` output namespace."""

@classmethod
def define(cls, spec):
"""Define the process specification, including its inputs, outputs and known exit codes.
:param spec: the calculation job process spec to define.
"""
super().define(spec)
spec.output_namespace('messages', valid_type=Str)


def monitor_store_message(node, transport, **kwargs): # pylint: disable=unused-argument
"""Test monitor that returns an output node."""
import datetime
import secrets

return CalcJobMonitorResult(
action=CalcJobMonitorAction.DISABLE_ALL,
outputs={
'messages': {'datetime_' + datetime.datetime.now().strftime('%Y%m%d_%H%M%S'): Str(secrets.token_hex(15))}
},
)


def test_calc_job_monitor_result_constructor_invalid():
Expand Down Expand Up @@ -175,3 +204,24 @@ def test_calc_job_monitors_process_poll_interval_integrated(entry_points, aiida_
# Check that the number of log messages emitted by the monitor is just 1 as it should have been called just once.
logs = [rec.message for rec in aiida_caplog.records if rec.message == 'monitor_emit_warning monitor was called']
assert len(logs) == 1


def test_calc_job_monitors_outputs(entry_points, aiida_local_code_factory):
"""Test a monitor that returns outputs to be attached to the node."""
entry_points.add(StoreMessageCalculation, 'aiida.calculations:core.store_message')
entry_points.add(monitor_store_message, 'aiida.calculations.monitors:core.store_message')

code = aiida_local_code_factory('core.store_message', '/bin/bash')
builder = code.get_builder()
builder.x = Int(1)
builder.y = Int(1)
builder.monitors = {'store_message': Dict({'entry_point': 'core.store_message', 'minimum_poll_interval': 1})}
builder.metadata = {'options': {'sleep': 3, 'resources': {'num_machines': 1}}}

_, node = run_get_node(builder)
assert node.is_finished_ok
assert 'messages' in node.outputs
assert len(node.outputs.messages)
for message in node.outputs.messages.values():
assert isinstance(message, Str)
assert len(message.value) == 30, message.value

0 comments on commit b7e59a0

Please sign in to comment.