Skip to content

Commit

Permalink
option to customize remote jobstore
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Mar 4, 2024
1 parent d9cede6 commit a0721be
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 69 deletions.
9 changes: 7 additions & 2 deletions src/jobflow_remote/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,15 @@ class Project(BaseModel):
)
jobstore: dict = Field(
default_factory=lambda: dict(DEFAULT_JOBSTORE),
description="The JobStore used for the input. Can contain the monty "
"serialized dictionary or the Store int the Jobflow format",
description="The JobStore used for the output. Can contain the monty "
"serialized dictionary or the Store in the Jobflow format",
validate_default=True,
)
remote_jobstore: Optional[dict] = Field(
None,
description="The JobStore used for the data transfer between the Runner"
"and the workers. Can be a string with the standard values",
)
metadata: Optional[dict] = Field(
None, description="A dictionary with metadata associated to the project"
)
Expand Down
4 changes: 3 additions & 1 deletion src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2711,7 +2711,9 @@ def complete_job(
self.update_flow_state(host_flow_id)
return True

remote_store = get_remote_store(store, local_path)
remote_store = get_remote_store(
store, local_path, self.project.remote_jobstore
)

update_store(store, remote_store, job_doc["db_id"])

Expand Down
25 changes: 10 additions & 15 deletions src/jobflow_remote/jobs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@

from jobflow_remote.jobs.batch import LocalBatchManager
from jobflow_remote.jobs.data import IN_FILENAME, OUT_FILENAME
from jobflow_remote.remote.data import (
default_orjson_serializer,
get_job_path,
get_remote_store_filenames,
)
from jobflow_remote.remote.data import get_job_path, get_store_file_paths
from jobflow_remote.utils.log import initialize_remote_run_log

logger = logging.getLogger(__name__)
Expand All @@ -43,24 +39,23 @@ def run_remote_job(run_dir: str | Path = "."):
job: Job = in_data["job"]
store = in_data["store"]

# needs to be set here again since it does not get properly serialized.
# it is possible to serialize the default function before serializing, but
# avoided that to avoid that any refactoring of the
# default_orjson_serializer breaks the deserialization of old Fireworks
store.docs_store.serialization_default = default_orjson_serializer
for additional_store in store.additional_stores.values():
additional_store.serialization_default = default_orjson_serializer

store.connect()

initialize_logger()
try:
response = job.run(store=store)
finally:
# some jobs may have compressed the FW files while being executed,
# try to decompress them if that is the case.
# try to decompress them if that is the case and files need to be
# decompressed.
decompress_files(store)

# Close the store explicitly, as minimal stores may require it.
try:
store.close()
except Exception:
logger.error("Error while closing the store", exc_info=True)

# The output of the response has already been stored in the store.
response.output = None

Expand Down Expand Up @@ -161,7 +156,7 @@ def run_batch_jobs(

def decompress_files(store: JobStore):
file_names = [OUT_FILENAME]
file_names.extend(get_remote_store_filenames(store))
file_names.extend(os.path.basename(p) for p in get_store_file_paths(store))

for fn in file_names:
# If the file is already present do not decompress it, even if
Expand Down
8 changes: 6 additions & 2 deletions src/jobflow_remote/jobs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def upload(self, lock: MongoLock):
# serializer could undergo refactoring and this could break deserialization
# of older FWs. It is set in the FireTask at runtime.
remote_store = get_remote_store(
store=store, launch_dir=remote_path, add_orjson_serializer=False
store=store, work_dir=remote_path, config_dict=self.project.remote_jobstore
)

created = host.mkdir(remote_path)
Expand Down Expand Up @@ -583,7 +583,11 @@ def download(self, lock):
makedirs_p(local_path)

fnames = [OUT_FILENAME]
fnames.extend(get_remote_store_filenames(store))
fnames.extend(
get_remote_store_filenames(
store, config_dict=self.project.remote_jobstore
)
)

for fname in fnames:
# in principle fabric should work by just passing the
Expand Down
Loading

0 comments on commit a0721be

Please sign in to comment.