Skip to content

Commit

Permalink
Add test for case where additional store is not known to jobflow-remote
Browse files Browse the repository at this point in the history
  • Loading branch information
ml-evs committed Feb 18, 2024
1 parent 9b0ec8b commit 4abf99b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/jobflow_remote/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,20 @@ def check_env_var() -> str:

@job(big_files="data")
def add_big(a: float, b: float):
"""Adds two numbers together and writes the answer to an artificially large file."""
"""Adds two numbers together and writes the answer to an artificially large file
which is stored in a pre-defined store."""
import pathlib

result = a + b
with open("file.txt", "w") as f:
f.writelines([f"{result}"] * int(1e5))
return Response({"data": pathlib.Path("file.txt"), "result": a + b})


@job(undefined_store="data")
def add_big_undefined_store(a: float, b: float):
"""Adds two numbers together and writes the answer to an artificially large file
which is attempted to be stored in a undefined store."""
import pathlib

result = a + b
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,32 @@ def test_additional_stores(worker, job_controller):
"blob_uuid"
]
assert fs.query({"blob_uuid": blob_uuid})[0]["job_uuid"] == doc["job"]["uuid"]


@pytest.mark.parametrize(
"worker",
["test_local_worker", "test_remote_worker"],
)
def test_undefined_additional_stores(worker, job_controller):
from jobflow import Flow

from jobflow_remote import submit_flow
from jobflow_remote.jobs.runner import Runner
from jobflow_remote.jobs.state import FlowState, JobState
from jobflow_remote.testing import add_big_undefined_store

job = add_big_undefined_store(100, 100)
flow = Flow(job)
submit_flow(flow, worker=worker)

assert job_controller.count_jobs({}) == 1
assert job_controller.count_flows({}) == 1

runner = Runner()
runner.run(ticks=10)

# Probably this should error somewhere
doc = job_controller.get_jobs({})[0]
assert job_controller.count_jobs(state=JobState.COMPLETED) == 1
assert job_controller.count_flows(state=FlowState.COMPLETED) == 1
assert job_controller.jobstore.get_output(uuid=doc["job"]["uuid"])["result"] == 200

0 comments on commit 4abf99b

Please sign in to comment.