Skip to content

Commit

Permalink
Add ability to detect missing configuration of additional stores
Browse files Browse the repository at this point in the history
- Use inspect instead of wrapped annotations, and return `RemoteError` when store is not present

- Tweak store tests

- Fix bad auto editor import

- Dial back the size of test data in the GridFS test
  • Loading branch information
ml-evs committed Feb 22, 2024
1 parent d224cda commit 5da2c9e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
20 changes: 20 additions & 0 deletions src/jobflow_remote/remote/data.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
from __future__ import annotations

import inspect
import io
import logging
import os
from pathlib import Path
from typing import Any

import orjson
from jobflow.core.job import Job
from jobflow.core.store import JobStore
from maggma.stores.mongolike import JSONStore
from monty.json import jsanitize

from jobflow_remote.jobs.data import RemoteError
from jobflow_remote.utils.data import uuid_to_path

JOB_INIT_ARGS = {k for k in inspect.signature(Job).parameters.keys() if k != "kwargs"}
"""A set of the arguments of the Job constructor which
can be used to detect additional custom arguments
"""


def get_job_path(
job_id: str, index: int | None, base_path: str | Path | None = None
Expand Down Expand Up @@ -165,4 +173,16 @@ def resolve_job_dict_args(job_dict: dict, store: JobStore) -> dict:
# substitution is in place
job_dict["function_args"] = resolved_args
job_dict["function_kwargs"] = resolved_kwargs

additional_store_names = set(job_dict.keys()) - JOB_INIT_ARGS
for store_name in additional_store_names:
# Exclude MSON fields
if store_name.startswith("@"):
continue
if store_name not in store.additional_stores:
raise RemoteError(
f"Additional store {store_name!r} is not configured for this project.",
no_retry=True,
)

return job_dict
14 changes: 7 additions & 7 deletions src/jobflow_remote/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ def check_env_var() -> str:
return os.environ.get("TESTING_ENV_VAR", "unset")


@job(big_files="data")
@job(big_data="data")
def add_big(a: float, b: float):
"""Adds two numbers together and writes the answer to an artificially large file
which is stored in a pre-defined store."""
import pathlib
"""Adds two numbers together and inflates the answer
to a large list list and tries to store that within
the defined store.
"""
result = a + b
with open("file.txt", "w") as f:
f.writelines([f"{result}"] * int(1e5))
return Response({"data": pathlib.Path("file.txt"), "result": a + b})
big_array = [result] * 5_000
return Response({"data": big_array, "result": a + b})


@job(undefined_store="data")
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def write_tmp_settings(
"collection_name": "docs",
},
"additional_stores": {
"big_files": {
"big_data": {
"type": "GridFSStore",
"database": store_database_name,
"host": "localhost",
Expand Down
7 changes: 3 additions & 4 deletions tests/integration/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def test_additional_stores(worker, job_controller):
runner.run(ticks=10)

doc = job_controller.get_jobs({})[0]
fs = job_controller.jobstore.additional_stores["big_files"]
fs = job_controller.jobstore.additional_stores["big_data"]
assert fs.count({"job_uuid": doc["job"]["uuid"]}) == 1
assert job_controller.count_jobs(state=JobState.COMPLETED) == 1
assert job_controller.count_flows(state=FlowState.COMPLETED) == 1
Expand All @@ -279,7 +279,7 @@ def test_undefined_additional_stores(worker, job_controller):

from jobflow_remote import submit_flow
from jobflow_remote.jobs.runner import Runner
from jobflow_remote.jobs.state import FlowState, JobState
from jobflow_remote.jobs.state import JobState
from jobflow_remote.testing import add_big_undefined_store

job = add_big_undefined_store(100, 100)
Expand All @@ -293,5 +293,4 @@ def test_undefined_additional_stores(worker, job_controller):
runner.run(ticks=10)

# The job should fail, as the additional store is not defined
assert job_controller.count_jobs(state=JobState.FAILED) == 1
assert job_controller.count_flows(state=FlowState.FAILED) == 1
assert job_controller.count_jobs(state=JobState.REMOTE_ERROR) == 1

0 comments on commit 5da2c9e

Please sign in to comment.