Skip to content

Commit

Permalink
use test for pause and play
Browse files Browse the repository at this point in the history
  • Loading branch information
superstar54 committed Nov 30, 2024
1 parent e8c3313 commit c0991ed
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 33 deletions.
10 changes: 0 additions & 10 deletions aiida_workgraph/utils/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ def get_task_state_info(node, name: str, key: str) -> str:
return value


def set_task_state_info(node, name: str, key: str, value: any) -> None:
"""Set task state info to base.extras."""
from aiida.orm.utils.serialize import serialize

if key == "process":
node.base.extras.set(f"_task_{key}_{name}", serialize(value))
else:
node.base.extras.set(f"_task_{key}_{name}", value)


def pause_tasks(pk: int, tasks: list, timeout: int = 5, wait: bool = False):
"""Pause task."""
node = orm.load_node(pk)
Expand Down
34 changes: 11 additions & 23 deletions tests/test_workgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,42 +114,30 @@ def test_extend_workgraph(decorated_add_multiply_group):
assert wg.tasks["group_multiply1"].node.outputs.result == 45


# @pytest.mark.skip(reason="pause task is not stable for the moment.")
@pytest.mark.usefixtures("started_daemon_client")
def test_pause_task_before_submit(wg_calcjob):
wg = wg_calcjob
wg.name = "test_pause_task"
wg.pause_tasks(["add2"])
wg.submit()
# wait for the workgraph to launch add2
wg.wait(tasks={"add2": ["CREATED"]}, timeout=20)
assert wg.tasks["add2"].node.process_state.value.upper() == "CREATED"
assert wg.tasks["add2"].node.process_status == "Paused through WorkGraph"
# I disabled the following lines because the test is not stable
# Seems the daemon is not responding to the play signal
# This should be a problem of AiiDA test fixtures
# wg.play_tasks(["add2"])
# wg.wait(tasks={"add2": ["FINISHED"]})
# assert wg.tasks["add2"].outputs["sum"].value == 9


@pytest.mark.skip(reason="pause task is not stable for the moment.")
def test_pause_task_after_submit(wg_calcjob):
wg = wg_calcjob
wg.tasks["add1"].set({"metadata.options.sleep": 5})
wg.name = "test_pause_task"
# pause add1 before submit
wg.pause_tasks(["add1"])
wg.submit()
# wait for the workgraph to launch add1
wg.wait(tasks={"add1": ["CREATED", "WAITING", "RUNNING", "FINISHED"]}, timeout=20)
wg.wait(tasks={"add1": ["CREATED"]}, timeout=20)
assert wg.tasks["add1"].node.process_state.value.upper() == "CREATED"
assert wg.tasks["add1"].node.process_status == "Paused through WorkGraph"
# pause add2 after submit
wg.pause_tasks(["add2"])
wg.play_tasks(["add1"])
# wait for the workgraph to launch add2
wg.wait(tasks={"add2": ["CREATED"]}, timeout=20)
assert wg.tasks["add2"].node.process_state.value.upper() == "CREATED"
assert wg.tasks["add2"].node.process_status == "Paused through WorkGraph"
# I disabled the following lines because the test is not stable
# Seems the daemon is not responding to the play signal
# wg.play_tasks(["add2"])
# wg.wait(tasks={"add2": ["FINISHED"]})
# assert wg.tasks["add2"].outputs["sum"].value == 9
wg.play_tasks(["add2"])
wg.wait()
assert wg.tasks["add2"].outputs["sum"].value == 9


def test_workgraph_group_outputs(decorated_add):
Expand Down

0 comments on commit c0991ed

Please sign in to comment.