From 498bc8aff640143b0cc8f5da2ba9f5830926bec6 Mon Sep 17 00:00:00 2001 From: Matthew Evans Date: Tue, 30 Jan 2024 22:53:50 +0000 Subject: [PATCH 1/3] Add test for additional stores with GridFS - Manually connect to gridfs and check for file - Read from GridFS store directly - Fix typo --- src/jobflow_remote/testing/__init__.py | 13 +++++- tests/integration/conftest.py | 11 ++++- tests/integration/test_slurm.py | 63 ++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 2 deletions(-) diff --git a/src/jobflow_remote/testing/__init__.py b/src/jobflow_remote/testing/__init__.py index 4aafe91e..53d40963 100644 --- a/src/jobflow_remote/testing/__init__.py +++ b/src/jobflow_remote/testing/__init__.py @@ -2,7 +2,7 @@ from typing import Callable, Optional, Union -from jobflow import job +from jobflow import Response, job @job @@ -41,3 +41,14 @@ def check_env_var() -> str: import os return os.environ.get("TESTING_ENV_VAR", "unset") + + +@job(big_files="data") +def add_big(a: float, b: float): + """Adds two numbers together and writes the answer to an artificially large file.""" + import pathlib + + result = a + b + with open("file.txt", "w") as f: + f.writelines([f"{result}"] * int(1e5)) + return Response({"data": pathlib.Path("file.txt"), "result": a + b}) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 903d0415..c38caf8a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -196,7 +196,16 @@ def write_tmp_settings( "host": "localhost", "port": db_port, "collection_name": "docs", - } + }, + "additional_stores": { + "big_files": { + "type": "GridFSStore", + "database": store_database_name, + "host": "localhost", + "port": db_port, + "collection_name": "data", + }, + }, }, queue={ "store": { diff --git a/tests/integration/test_slurm.py b/tests/integration/test_slurm.py index 7dc849e9..53ef0b1f 100644 --- a/tests/integration/test_slurm.py +++ b/tests/integration/test_slurm.py @@ -234,3 +234,66 @@ def test_exec_config(worker, job_controller, random_project_name): job = job_controller.get_jobs({})[0] output = job_controller.jobstore.get_output(uuid=job["uuid"]) assert output == random_project_name + + +@pytest.mark.parametrize( + "worker", + ["test_local_worker", "test_remote_worker"], +) +def test_additional_stores(worker, job_controller): + from jobflow import Flow + + from jobflow_remote import submit_flow + from jobflow_remote.jobs.runner import Runner + from jobflow_remote.jobs.state import FlowState, JobState + from jobflow_remote.testing import add_big + + job = add_big(100, 100) + flow = Flow(job) + submit_flow(flow, worker=worker) + + assert job_controller.count_jobs({}) == 1 + assert job_controller.count_flows({}) == 1 + + runner = Runner() + runner.run(ticks=10) + + doc = job_controller.get_jobs({})[0] + fs = job_controller.jobstore.additional_stores["big_files"] + assert fs.count({"job_uuid": doc["job"]["uuid"]}) == 1 + assert job_controller.count_jobs(state=JobState.COMPLETED) == 1 + assert job_controller.count_flows(state=FlowState.COMPLETED) == 1 + assert job_controller.jobstore.get_output(uuid=doc["job"]["uuid"])["result"] == 200 + blob_uuid = job_controller.jobstore.get_output(uuid=doc["job"]["uuid"])["data"][ + "blob_uuid" + ] + assert list(fs.query({"blob_uuid": blob_uuid}))[0]["job_uuid"] == doc["job"]["uuid"] + + +@pytest.mark.parametrize( + "worker", + ["test_local_worker", "test_remote_worker"], +) +def test_undefined_additional_stores(worker, job_controller): + from jobflow import Flow + + from jobflow_remote import submit_flow + from jobflow_remote.jobs.runner import Runner + from jobflow_remote.jobs.state import FlowState, JobState + from jobflow_remote.testing import add_big_undefined_store + + job = add_big_undefined_store(100, 100) + flow = Flow(job) + submit_flow(flow, worker=worker) + + assert job_controller.count_jobs({}) == 1 + assert job_controller.count_flows({}) == 1 + + runner = Runner() + runner.run(ticks=10) + + # Probably this should error somewhere + doc = job_controller.get_jobs({})[0] + assert job_controller.count_jobs(state=JobState.COMPLETED) == 1 + assert job_controller.count_flows(state=FlowState.COMPLETED) == 1 + assert job_controller.jobstore.get_output(uuid=doc["job"]["uuid"])["result"] == 200 From 4df07955e9c1f96c1f83440225d8c3b1dc512511 Mon Sep 17 00:00:00 2001 From: Matthew Evans Date: Sun, 18 Feb 2024 15:01:49 +0000 Subject: [PATCH 2/3] Add test for case where additional store is not known to jobflow-remote --- src/jobflow_remote/testing/__init__.py | 15 ++++++++++++++- tests/integration/test_slurm.py | 8 +++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/jobflow_remote/testing/__init__.py b/src/jobflow_remote/testing/__init__.py index 53d40963..e3d6362a 100644 --- a/src/jobflow_remote/testing/__init__.py +++ b/src/jobflow_remote/testing/__init__.py @@ -45,7 +45,20 @@ def check_env_var() -> str: @job(big_files="data") def add_big(a: float, b: float): - """Adds two numbers together and writes the answer to an artificially large file.""" + """Adds two numbers together and writes the answer to an artificially large file + which is stored in a pre-defined store.""" + import pathlib + + result = a + b + with open("file.txt", "w") as f: + f.writelines([f"{result}"] * int(1e5)) + return Response({"data": pathlib.Path("file.txt"), "result": a + b}) + + +@job(undefined_store="data") +def add_big_undefined_store(a: float, b: float): + """Adds two numbers together and writes the answer to an artificially large file + which is attempted to be stored in a undefined store.""" import pathlib result = a + b diff --git a/tests/integration/test_slurm.py b/tests/integration/test_slurm.py index 53ef0b1f..5150fc79 100644 --- a/tests/integration/test_slurm.py +++ b/tests/integration/test_slurm.py @@ -292,8 +292,6 @@ def test_undefined_additional_stores(worker, job_controller): runner = Runner() runner.run(ticks=10) - # Probably this should error somewhere - doc = job_controller.get_jobs({})[0] - assert job_controller.count_jobs(state=JobState.COMPLETED) == 1 - assert job_controller.count_flows(state=FlowState.COMPLETED) == 1 - assert job_controller.jobstore.get_output(uuid=doc["job"]["uuid"])["result"] == 200 + # The job should fail, as the additional store is not defined + assert job_controller.count_jobs(state=JobState.FAILED) == 1 + assert job_controller.count_flows(state=FlowState.FAILED) == 1 From 92cb7408ec02a69e25561966e6baaaee50b428f1 Mon Sep 17 00:00:00 2001 From: Matthew Evans Date: Mon, 19 Feb 2024 15:23:51 +0000 Subject: [PATCH 3/3] Add ability to detect missing configuration of additional stores - Use inspect instead of wrapped annotations, and return `RemoteError` when store is not present - Tweak store tests - Fix bad auto editor import - Dial back the size of test data in the GridFS test --- src/jobflow_remote/remote/data.py | 20 ++++++++++++++++++++ src/jobflow_remote/testing/__init__.py | 14 +++++++------- tests/integration/conftest.py | 2 +- tests/integration/test_slurm.py | 7 +++---- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/jobflow_remote/remote/data.py b/src/jobflow_remote/remote/data.py index 15b80059..aa67e5f6 100644 --- a/src/jobflow_remote/remote/data.py +++ b/src/jobflow_remote/remote/data.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import io import logging import os @@ -7,12 +8,19 @@ from typing import Any import orjson +from jobflow.core.job import Job from jobflow.core.store import JobStore from maggma.stores.mongolike import JSONStore from monty.json import jsanitize +from jobflow_remote.jobs.data import RemoteError from jobflow_remote.utils.data import uuid_to_path +JOB_INIT_ARGS = {k for k in inspect.signature(Job).parameters.keys() if k != "kwargs"} +"""A set of the arguments of the Job constructor which +can be used to detect additional custom arguments +""" + def get_job_path( job_id: str, index: int | None, base_path: str | Path | None = None @@ -165,4 +173,16 @@ def resolve_job_dict_args(job_dict: dict, store: JobStore) -> dict: # substitution is in place job_dict["function_args"] = resolved_args job_dict["function_kwargs"] = resolved_kwargs + + additional_store_names = set(job_dict.keys()) - JOB_INIT_ARGS + for store_name in additional_store_names: + # Exclude MSON fields + if store_name.startswith("@"): + continue + if store_name not in store.additional_stores: + raise RemoteError( + f"Additional store {store_name!r} is not configured for this project.", + no_retry=True, + ) + return job_dict diff --git a/src/jobflow_remote/testing/__init__.py b/src/jobflow_remote/testing/__init__.py index e3d6362a..9981ec95 100644 --- a/src/jobflow_remote/testing/__init__.py +++ b/src/jobflow_remote/testing/__init__.py @@ -43,16 +43,16 @@ def check_env_var() -> str: return os.environ.get("TESTING_ENV_VAR", "unset") -@job(big_files="data") +@job(big_data="data") def add_big(a: float, b: float): - """Adds two numbers together and writes the answer to an artificially large file - which is stored in a pre-defined store.""" - import pathlib + """Adds two numbers together and inflates the answer + to a large list list and tries to store that within + the defined store. + """ result = a + b - with open("file.txt", "w") as f: - f.writelines([f"{result}"] * int(1e5)) - return Response({"data": pathlib.Path("file.txt"), "result": a + b}) + big_array = [result] * 5_000 + return Response({"data": big_array, "result": a + b}) @job(undefined_store="data") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c38caf8a..112d7589 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -198,7 +198,7 @@ def write_tmp_settings( "collection_name": "docs", }, "additional_stores": { - "big_files": { + "big_data": { "type": "GridFSStore", "database": store_database_name, "host": "localhost", diff --git a/tests/integration/test_slurm.py b/tests/integration/test_slurm.py index 5150fc79..ed9adbe0 100644 --- a/tests/integration/test_slurm.py +++ b/tests/integration/test_slurm.py @@ -259,7 +259,7 @@ def test_additional_stores(worker, job_controller): runner.run(ticks=10) doc = job_controller.get_jobs({})[0] - fs = job_controller.jobstore.additional_stores["big_files"] + fs = job_controller.jobstore.additional_stores["big_data"] assert fs.count({"job_uuid": doc["job"]["uuid"]}) == 1 assert job_controller.count_jobs(state=JobState.COMPLETED) == 1 assert job_controller.count_flows(state=FlowState.COMPLETED) == 1 @@ -279,7 +279,7 @@ def test_undefined_additional_stores(worker, job_controller): from jobflow_remote import submit_flow from jobflow_remote.jobs.runner import Runner - from jobflow_remote.jobs.state import FlowState, JobState + from jobflow_remote.jobs.state import JobState from jobflow_remote.testing import add_big_undefined_store job = add_big_undefined_store(100, 100) @@ -293,5 +293,4 @@ def test_undefined_additional_stores(worker, job_controller): runner.run(ticks=10) # The job should fail, as the additional store is not defined - assert job_controller.count_jobs(state=JobState.FAILED) == 1 - assert job_controller.count_flows(state=FlowState.FAILED) == 1 + assert job_controller.count_jobs(state=JobState.REMOTE_ERROR) == 1