Skip to content

Commit

Permalink
Add a test with expected job failures
Browse files Browse the repository at this point in the history
  • Loading branch information
ml-evs committed Jan 8, 2024
1 parent 44a4708 commit 8d78963
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
6 changes: 6 additions & 0 deletions src/jobflow_remote/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ def add(a, b):
return a + b


@job
def always_fails():
"""A job that always fails."""
raise RuntimeError("This job failed.")


@job
def write_file(n):
with open("results.txt", "w") as f:
Expand Down
7 changes: 6 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ def build_and_launch_container(
try:
print(f"\n * Launching container for {image_name}...")
container = docker_client.containers.run(
image_name, detach=True, remove=True, tty=True, ports=ports
image_name,
detach=True,
remove=True,
auto_remove=True,
tty=True,
ports=ports,
)
assert isinstance(container, Container)
print(" * Waiting for container to be ready...", end="")
Expand Down
38 changes: 36 additions & 2 deletions tests/integration/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ def test_submit_flow(worker, job_controller):
runner = Runner()
runner.run(ticks=10)

jobs = job_controller.get_jobs({})
assert len(jobs) == 2
assert job_controller.count_jobs({}) == 2
assert len(job_controller.get_jobs({})) == 2
assert job_controller.count_flows({}) == 1


@pytest.mark.parametrize("worker", ["test_local_worker", "test_remote_worker"])
Expand All @@ -57,4 +58,37 @@ def test_submit_flow_with_dependencies(worker, job_controller):
runner = Runner()
runner.run(ticks=10)

assert job_controller.count_jobs({}) == 4
assert len(job_controller.get_jobs({})) == 4
assert job_controller.count_flows({}) == 1


@pytest.mark.parametrize("worker", ["test_local_worker", "test_remote_worker"])
def test_expected_failure(worker, job_controller):
from jobflow import Flow

from jobflow_remote import submit_flow
from jobflow_remote.jobs.runner import Runner
from jobflow_remote.jobs.state import FlowState, JobState
from jobflow_remote.testing import always_fails

job_1 = always_fails()
job_2 = always_fails()

flow = Flow([job_1, job_2])
submit_flow(flow, worker=worker)

runner = Runner()
runner.run(ticks=10)

assert job_controller.count_jobs({}) == 2
assert len(job_controller.get_jobs({})) == 2
assert job_controller.count_flows({}) == 1

if worker == "test_local_worker":
assert job_controller.count_jobs(state=JobState.FAILED) == 2
assert job_controller.count_flows(state=FlowState.FAILED) == 1
else:
# The remote flow will be left in a running state until the remote errors can be rectified
assert job_controller.count_jobs(state=JobState.REMOTE_ERROR) == 2
assert job_controller.count_flows(state=FlowState.RUNNING) == 1

0 comments on commit 8d78963

Please sign in to comment.