diff --git a/doc/source/_static/code/project_simple.yaml b/doc/source/_static/code/project_simple.yaml index d73335ab..513ccfb0 100644 --- a/doc/source/_static/code/project_simple.yaml +++ b/doc/source/_static/code/project_simple.yaml @@ -9,12 +9,13 @@ workers: host: remote.host.net user: bob queue: - type: MongoStore - host: localhost - database: db_name - username: bob - password: secret_password - collection_name: jobs + store: + type: MongoStore + host: localhost + database: db_name + username: bob + password: secret_password + collection_name: jobs exec_config: {} jobstore: docs_store: diff --git a/doc/source/_static/img/project_erdantic.png b/doc/source/_static/img/project_erdantic.png index b8fc3fbe..82adce0c 100644 Binary files a/doc/source/_static/img/project_erdantic.png and b/doc/source/_static/img/project_erdantic.png differ diff --git a/doc/source/conf.py b/doc/source/conf.py index 3f9e254c..759ad05b 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -61,6 +61,7 @@ "sphinx_design", "sphinx_copybutton", "sphinxcontrib.autodoc_pydantic", + "sphinxcontrib.mermaid", ] # Add any paths that contain templates here, relative to this directory. diff --git a/doc/source/user/index.rst b/doc/source/user/index.rst index baddba9b..ec7c92ed 100644 --- a/doc/source/user/index.rst +++ b/doc/source/user/index.rst @@ -15,6 +15,8 @@ details are found in :ref:`reference`. install projectconf quickstart + tuning + states .. toctree:: :caption: Advanced usage and interoperability diff --git a/doc/source/user/install.rst b/doc/source/user/install.rst index cfbc7bf2..a1871f45 100644 --- a/doc/source/user/install.rst +++ b/doc/source/user/install.rst @@ -182,7 +182,7 @@ Jobstore -------- The ``jobstore`` used for ``jobflow``. Its definition is equivalent to the one used in -``jobflow``'s configuration file. See `Jobflows documentation `_ +``jobflow``'s configuration file. See `Jobflow's documentation `_ for more details. It can be the same as in the :ref:`queue simple config` or a different one. Check diff --git a/doc/source/user/projectconf.rst b/doc/source/user/projectconf.rst index 46a24d26..b14a0bdc 100644 --- a/doc/source/user/projectconf.rst +++ b/doc/source/user/projectconf.rst @@ -1,15 +1,21 @@ .. _projectconf: -********************** -Projects configuration -********************** +*********************************** +Projects configuration and Settings +*********************************** -Jobflow-remote allows to handle multiple configurations, defined projects. Since +Jobflow-remote allows to handle multiple configurations, defined **projects**. Since for most of the users a single project is enough let us first consider the configuration -of a single project. The handling of multiple projects will be described below. +of a single project. The handling of :ref:`projectconf multi` will be described below. -The configurations allow to control the behaviour of the Job execution, as well as -the other objects in jobflow-remote. Here a full description of the project's +Aside from the project options, a set of :ref:`projectconf general` can be also be +configured through environment variables or an additional configuration file. + +Project options +=============== + +The project configurations allow to control the behaviour of the Job execution, as well +as the other objects in jobflow-remote. Here a full description of the project's configuration file will be given. If you are looking for a minimal example with its description you can find it in the :ref:`minimal project config` section. @@ -32,8 +38,7 @@ section below, while an example for a full configuration file can be generated r Note that, while the default file format is YAML, JSON and TOML are also acceptable format. You can generate the example in the other formats using the ``--format`` option. -Project options -=============== + Name and folders ---------------- @@ -53,6 +58,8 @@ For all these folders the paths are set with defaults, but can be customised set The project name does not take into consideration the configuration file name. For coherence it would be better to give use the project name as file name. +.. _projectconf worker: + Workers ------- @@ -72,18 +79,118 @@ type all the credentials to connect automatically should be provided. The best o would be to set up a passwordless connection and define it in the ``~/.ssh/config`` file. -The other key property of the workers is the ``scheduler_type``. +The other key property of the workers is the ``scheduler_type``. It can be any of the +values supported by the `qtoolkit `_. Typical +values are: + +* ``shell``: the Job is executed directly in the shell. No queue will be used. + If not limited, all the Jobs can be executed simultaneously +* ``slurm``, ``pbs``, ...: the name of a queueing system. The job will be submitted + to the queue with the selected resources. + +Another mandatory argument is ``work_dir``, indicating the full path for a folder +on the worker machine where the Jobs will be actually executed. + +It is possible to optionally select default values for keywords like ``pre_run`` +and ``resources``, that can be overridden for individual Jobs. Note that these +configurations will be applied to *all* the Jobs executed by the worker. These +are thus more suitable for generic settings (e.g. the activation of a python +environment, or loading of some modules), rather than for the specific code +configurations. Those can better be set with the :ref:`projectconf execconfig`. .. note:: If a single worker is defined it will be used as default in the submission of new Flows. +JobStore +-------- + +The ``jobstore`` value contains a dictionary representation of the standard +``JobStore`` object defined in jobflow. It can either be the serialized +version as obtained by the ``as_dict`` module or the representation defined +in `jobflow's documentation `_. + +This ``JobStore`` will be used to store the outputs of all the Jobs executed +in this project. + +.. note:: + + The ``JobStore`` should be defined in jobflow-remote's configuration file. + The content of the standard jobflow configuration file will be ignored. + +Queue Store +----------- + +The ``queue`` element contains the definition of the database containing the +state of the Jobs and Flows. The subelement ``store`` should contain the +representation of a `maggma `_ ``Store``. +As for the ``JobStore`` it can be either its serialization or the same kind +of representation used for the ``docs_store`` in jobflow's configuration file. + +The collection defined by the ``Store`` will contain the information about the +state of the ``Job``, while two more collections will be created. The name +of these two collections can also be customized. + +.. warning:: + + The queue ``Store`` should be a subclass of the ``MongoStore`` and currently + it should be based on a real MongoDB (e.g. not a ``JSONStore``). + Some key operations required by jobflow-remote on the collections are not + supported by any file based MongoDB implementation at the moment. + +.. _projectconf execconfig: + +Execution configurations +------------------------ + +It is possible to define a set of ``ExecutionConfig`` objects to quickly set up +configurations for different kind of Jobs and Flow. The ``exec_config`` key +contains a dictionary where the keys are the names associated to the configurations +and for each a set of instruction to be set before and after the execution of the Job. + +Runner options +-------------- + +The behaviour of the ``Runner`` can also be customized to some extent. In particular +the ``Runner`` implements an exponential backoff mechanism for retrying when an +operation of updating of a Job state fails. The amount of tries and the delay between +them can be set ``max_step_attempts`` and ``delta_retry`` values. In addition some +reasonable values are set for the delay between each check of the database for +different kind of actions performed by the ``Runner``. These intervals can be +changed to better fit your needs. Remind that reducing these intervals too much +may put unnecessary strain on the database. + +Metadata +-------- + +While this does currently not play any role in the execution of jobflow-remote, +this can be used to include some additional information to be used by external +tools or to quickly distinguish a configuration file among others. + +.. _projectconf multi: Multiple Projects ================= -asdsd +While a single project can be enough for most of the users and for beginners, +it may be convenient to define different databases, configurations and python +environments to work on different topics. For this reason jobflow-remote will +consider as potential projects configuration all the YAML, JSON and TOML files +in the ``~/.jfremote`` folder. There is no additional procedure required to +add or remove project, aside from creating/deleting a project configuration file. + +If more than one project is present and a specific one is not selected, the +code will always stop asking for a project to be specified. Python functions +like ``submit_flow`` and ``get_jobstore`` accept a ``project`` argument to +specify which project should be considered. For the command line interface +a general ``-p`` allows to select a project for the command that is being +executed:: + + jf -p another_project job list + +To define a default project for all the functions and commands executed on the +system or in a specific cell see the :ref:`projectconf general` section. .. _project detailed specs: @@ -92,3 +199,31 @@ Project specs .. raw:: html :file: ../_static/project_schema.html + +.. _projectconf general: + +General Settings +================ + +Aside from the project specific configuration, a few options can also be +defined in general. There are two ways to set these options: + +* set the value in the ``~/.jfremote.yaml`` configuration file. +* export the variable name prepended by the ``jfremote`` prefix:: + + export jfremote_project=project_name + +.. note:: + + The name of the exported variables is case-insensitive (i.e. JFREMOTE_PROJECT + is equally valid). + +The most useful variable to set is the ``project`` one, allowing to select the +default project to be used in a multi-project environment. + +Other generic options are the location of the projects folder, instead of +``~/.jfremote`` (``projects_folder``) and the path to the ``~/.jfremote.yaml`` +file itself (``config_file``). + +Some customization options are also available for the behaviour of the CLI. +For more details see the API documentation :py:class:`jobflow_remote.config.settings.JobflowRemoteSettings`. diff --git a/doc/source/user/quickstart.rst b/doc/source/user/quickstart.rst index 4151633d..eb17ba61 100644 --- a/doc/source/user/quickstart.rst +++ b/doc/source/user/quickstart.rst @@ -56,9 +56,14 @@ This code will print an integer unique id associated to the submitted ``Job`` s. On the worker selection: * The worker should match the name of one of the workers defined in the project. * In this way all the ``Job`` s will be assigned to the same worker. - * If only one worker is defined, the argument can be omitted. + * If the argument is omitted the first worker in the project configuration is used. * In any case the worker is determined when the ``Job`` is inserted in the database. +.. warning:: + + Once the flow has been submitted to the database, any further change to the + ``Flow`` object will not be taken into account. + It is now possible to use the ``jf`` command line interface (CLI):: jf job list @@ -127,6 +132,11 @@ done before:: │ 1 │ add │ COMPLETED │ ae020c67-72f0-4805-858e-fe48644e4bb0 (1) │ local_shell │ 2023-12-19 16:44 │ └───────┴──────┴───────────┴───────────────────────────────────────────┴─────────────┴────────────────────┘ +.. note:: + + The ``Runner`` checks the states of the Jobs at regular intervals. A few seconds may + be required to have a change in the Job state. + The ``Runner`` will keep checking the database for the submission of new Jobs and will update the state of each Job as soon as the previous action is completed. If you plan to keep submitting workflows you can keep the daemon running, otherwise @@ -139,6 +149,16 @@ you can stop the process with:: By default the daemon will spawn several processes, each taking care of some of the actions listed above. +.. warning:: + + The ``stop`` command will send a ``SIGTERM`` command to the ``Runner`` processes, that + will terminate the action currently being performed before actually stopping. This should + prevent the presence on inconsistent states in the database. + However, if you believe the ``Runner`` is stuck or need to halt the ``Runner`` immediately + you can kill the processes with:: + + jf runner kill + Results ======= diff --git a/doc/source/user/states.rst b/doc/source/user/states.rst new file mode 100644 index 00000000..3bb14740 --- /dev/null +++ b/doc/source/user/states.rst @@ -0,0 +1,234 @@ +.. _states: + +****** +States +****** + + +Job States +********** + +During their execution by the ``Runner`` a Job can reach different states. +Each of the states describes the current status of the after the ``Runner`` +has finished switching from one state to another. + +Since the ``Runner`` can be stopped and will update the different states at +predefined intervals, it may be that the state does not reflect the +actual situation of a Job (for example it could be that the process of a +Job in the ``RUNNING`` state has finished, but the ``Runner`` did not +update its state yet. + +Description +=========== + +The states can then be grouped in + +* Waiting states: describing Jobs that has not started yet. +* Running states: states for which the ``Runner`` has started working on the Job +* Completed state: a state where the Job has been completed successfully +* Error states: states associated with some error in the Job, either programmatic + or during the execution. + +The list of Job states is defined in the :py:class:`jobflow_remote.jobs.state.JobState` +object. Here we present a list of each state with a short description. + +WAITING +------- + +Waiting state. A Job that has been inserted into the database but has +to wait for other Jobs to be completed before starting. + +READY +----- + +Waiting state. A Job that is ready to be executed by the ``Runner``. + +CHECKED_OUT +----------- + +Running state. A Job that has been selected by the ``Runner`` to +start its execution. + +UPLOADED +-------- + +Running state. All the inputs required by the Job has been copied +to the worker. + +SUBMITTED +--------- + +Running state. The Job has been submitted to the queueing +system of the worker. + +RUNNING +------- + +Running state. The ``Runner`` verified that the Job has started is being +executed on the worker. + +TERMINATED +---------- + +Running state. The process executing the Job on the worked has finished +running. No knowledge of whether this happened for an error or because +the Job was completed correctly is available at this point. + +DOWNLOADED +---------- + +Running state. The ``Runner`` has copied to the local machine all the +files containing the Job response and outputs to be stored. + +COMPLETED +--------- + +Completed state. A Job that has completed correctly. + +FAILED +------ + +Error state. The procedure to execute the Job completed correctly, but +an error happened during the execution of the Job's function, so the +Job did not complete successfully. + +REMOTE_ERROR +------------ + +Error state. An error occurred during the procedure to execute the Job. +For example the files could not be copied due to some network error and +the maximum number of attempts has been reached. The Job may or may not +be executed, depending on the action that generated the issue, but in +any case no information is available about it. This failure is independent +from the correct execution of the Job's function. + +PAUSED +------ + +Waiting state. The Job has been paused by the user and will not be +executed by the ``Runner``. A Job in this state can be started again. + +STOPPED +------- + +Error state. The Job was stopped by another Job as a consequence of a +``stop_jobflow`` or ``stop_children`` actions in the Job's response. +This state cannot be modified. + +USER_STOPPED +--------- + +Error state. A Job stopped by the user. This state cannot be modified. + +BATCH_SUBMITTED +--------------- + +Running state. A Job submitted for execution to a batch worker. Differs +from the ``SUBMITTED`` state since the ``Runner`` does not have to +check its state in the queueing system. + +BATCH_RUNNING +--------------- + +Running state. A Job that is being executed by a batch worker. Differs +from the ``RUNNING`` state since the ``Runner`` does not have to +check its state in the queueing system. + + +Evolution +========= + +If the state of a Job is not directly modified by user, the ``Runner`` +will consistently update the state of each Job in a running state. + +The following diagram illustrates which states transitions can +be performed by the ``Runner`` on a Job. This includes the transitions +to intermediate or final error states. + +.. mermaid:: + + stateDiagram-v2 + WAITING --> READY + READY --> CHECKED_OUT + CHECKED_OUT --> UPLOADED + UPLOADED --> SUBMITTED + SUBMITTED --> RUNNING + RUNNING --> TERMINATED + SUBMITTED --> TERMINATED + TERMINATED --> DOWNLOADED + DOWNLOADED --> COMPLETED + DOWNLOADED --> FAILED + + READY --> REMOTE_ERROR + CHECKED_OUT --> REMOTE_ERROR + UPLOADED --> REMOTE_ERROR + SUBMITTED --> REMOTE_ERROR + RUNNING --> REMOTE_ERROR + TERMINATED --> REMOTE_ERROR + DOWNLOADED --> REMOTE_ERROR + + + + classDef error fill:#E62A2A,color:white + classDef running fill:#2a48e6,color:white + classDef success fill:#289e21,color:white + classDef ready fill:#8be485 + classDef wait fill:#eae433 + + class REMOTE_ERROR,FAILED error + class CHECKED_OUT,UPLOADED,SUBMITTED,RUNNING,TERMINATED,DOWNLOADED running + class COMPLETED success + class READY ready + class WAITING wait + +Flow states +*********** + +Each Flow in the database also has a global state. This is a function of +the states of each of the Jobs included in the Flow. As for the Jobs, +the Flow states can change due to the action of the ``Runner`` +or of the user. + +Description +=========== + +The list of Flow states is simplified compared to the Job's states, since several +Job state will be grouped under a single Flow state. + +The list of Flow states is defined in the :py:class:`jobflow_remote.jobs.state.FlowState` +object. Here we present a list of each state with a short description. + +READY +----- + +There is at least one Job in the READY state. No Jobs have started or have failed. + +RUNNING +------- + +At least one of the Jobs is being or has been executed. The state will not be +changed if a single Job completes, but there are still other Jobs to be executed. + +COMPLETED +--------- + +All the left Jobs of the Flow are in the ``COMPLETED`` state. This means that some +intermediate Job may be in the ``FAILED`` state, but its children are set to +not give an error in the ``on_missing_references`` of the ``JobConfig``. + +FAILED +------ + +At least one of the Jobs failed and the Flow is not ``COMPLETED``. + +STOPPED +------- + +At least one of the Job is in the ``STOPPED`` or the ``USER_STOPPED`` state +and the flow is not in one of the previous states. + +PAUSED +------ + +At least one of the Job is in the ``PAUSED`` state and the flow is not in one +of the previous states. diff --git a/doc/source/user/tuning.rst b/doc/source/user/tuning.rst index 282de0aa..53a8f86f 100644 --- a/doc/source/user/tuning.rst +++ b/doc/source/user/tuning.rst @@ -3,3 +3,152 @@ ******************** Tuning Job execution ******************** + +Jobs with time consuming calculations require to properly configure +the environment and the resources used to execute them. This +section focuses on which options can be tuned and the ways available +in jobflow-remote to change them. + +Tuning options +============== + +Worker +------ + +A worker is a computational unit that will actually execute the function +inside a Job. The list of workers is given in the :ref:`projectconf worker` +project configuration. + +Workers are set by the name used to define them in the project and a worker +should always be defined for each Job when adding a Flow to the database. + +.. note:: + + A single worker should not necessarily be identified with a computation + resource as a whole. Different workers referring to the same for example + to the same HPC center, but with different configurations can be created. + The ``Runner`` will still open a single connection if the host is the same. + +Execution configuration +----------------------- + +An execution configuration, represented as an ``ExecutionConfig`` object in +the code, contains information to run additional commands before and after +the execution of a Job. + +These can be typically used to define the modules to load on an HPC center, +specific python environment to load or setting the ``PATH`` for some executable +needed by the Job. + +They can be usually given as a string referring to the setting defined in the +project configuration file, or as an instance of ``ExecutionConfig``. + +Resources +--------- + +If the worker executing the Job runs under the control of a queueing system +(e.g. SLURM, PBS), it is also important to specify which resources need to +be allocated when running a Job. + +Since the all the operations involving the queueing system are handled with +`qtoolkit `_, jobflow-remote +supports the same functionalities. In particular it is either possible to +pass a dictionary containing the keywords specific to the selected queuing system +or to pass an instance of a ``QResources``, a generic object defining resources +for standard use cases. These will be used to fill in a template and generate +a suitable submission script. + +How to tune +=========== + +Different ways of setting the worker, execution configuration and resources +for each Job are available. A combination of them can be used to ease the +configuration for all the Jobs. + +.. note:: + + If not defined otherwise, Jobs generated dynamically will inherit + the configuration of the Job that generated them. + +Submission +---------- + +The first entry point to customize the execution of the Jobs in a Flow +is to use the arguments in the ``submit_flow`` function. + +.. code-block:: python + + resource = {"nodes": 1, "ntasks": 4, "partition": "batch"} + submit_flow( + flow, worker="local_shell", exec_config="somecode_v.x.y", resources=resources + ) + +This will set the passed values for all the Jobs for which have not been +set in the Job previously. + +.. warning:: + + Once the flow has been submitted to the database, any further change to the + ``Flow`` object will not be taken into account. + +JobConfig +--------- + +Each jobflow's Job has a ``JobConfig`` attribute. This can be used to store +a ``manager_config`` dictionary with configuration specific to that Job. + +This can be done with the ``set_run_config`` function, that targets Jobs +based on their name or on the callable they are wrapping. Consider the +following example + +.. code-block:: python + + from jobflow_remote.utils.examples import add, value + from jobflow_remote import submit_flow, set_run_config + from jobflow import Flow + + job1 = value(5) + job2 = add(job1.output, 2) + + flow = Flow([job1, job2]) + + flow = set_run_config( + flow, name_filter="add", worker="secondw", exec_config="anotherconfig" + ) + + resource = {"nodes": 1, "ntasks": 4, "partition": "batch"} + submit_flow(flow, worker="firstw", exec_config="somecode_v.x.y", resources=resources) + +After being submitted to the database the ``value`` Job will be executed +on the ``firstw`` worker, while the ``add`` Job will be executed on the +``secondw`` worker. + +In addition, since ``set_run_config`` makes use of jobflow's ``update_config`` +method, these updates will also automatically be applied to any new Job +automatically generated in the Flow. + +.. warning:: + + The ``name_filter`` matches any name containing the string passed. + So using a ``name_filter=add`` will match both a job named ``add`` + and one named ``add more``. + + +CLI +--- + +After a Job has been added to the database, it is still possible to change +its settings. This can be achieved with the ``jf job set`` CLI command. +For example running:: + + jf job set worker -did 8 example_worker + +sets the worker for Job with DB id 8 to ``example_worker``. Similarly, +the ``jf job set resources`` and ``jf job set exec-config`` can be used +to set the values of the resources and execution configurations. + +.. note:: + + In order for this to be meaningful only Jobs that have not been started + can be modified. So this commands can be applied only to Jobs in the + ``READY`` or ``WAITING`` states. diff --git a/pyproject.toml b/pyproject.toml index 0dd980b5..d7e32a54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,7 @@ docs = [ "pydata-sphinx-theme", "sphinx-copybutton", "autodoc_pydantic>=2.0.0", + "sphinxcontrib-mermaid" ] strict = [] diff --git a/src/jobflow_remote/__init__.py b/src/jobflow_remote/__init__.py index 31ffb336..dbb67772 100644 --- a/src/jobflow_remote/__init__.py +++ b/src/jobflow_remote/__init__.py @@ -1,6 +1,7 @@ """jobflow-remote is a python package to run jobflow workflows on remote resources""" from jobflow_remote._version import __version__ +from jobflow_remote.config.jobconfig import set_run_config from jobflow_remote.config.manager import ConfigManager from jobflow_remote.config.settings import JobflowRemoteSettings from jobflow_remote.jobs.jobcontroller import JobController diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index f6a50464..d9a678cf 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -429,7 +429,7 @@ def play( @app_job.command() -def cancel( +def stop( job_db_id: job_db_id_arg = None, job_index: job_index_arg = None, job_id: job_ids_indexes_opt = None, @@ -448,7 +448,7 @@ def cancel( raise_on_error: raise_on_error_opt = False, ): """ - Cancel a Job. Only Jobs that did not complete or had an error can be cancelled. + Stop a Job. Only Jobs that did not complete or had an error can be stopped. The operation is irreversible. If possible, the associated job submitted to the remote queue will be cancelled. """ @@ -458,8 +458,8 @@ def cancel( jc = get_job_controller() execute_multi_jobs_cmd( - single_cmd=jc.cancel_job, - multi_cmd=jc.cancel_jobs, + single_cmd=jc.stop_job, + multi_cmd=jc.stop_jobs, job_db_id=job_db_id, job_index=job_index, job_ids=job_id, @@ -703,7 +703,7 @@ def resources( raise_on_error: raise_on_error_opt = False, ): """ - Set the worker for the selected Jobs. Only READY or WAITING Jobs. + Set the resources for the selected Jobs. Only READY or WAITING Jobs. """ resources_value = str_to_dict(resources_value) diff --git a/src/jobflow_remote/cli/types.py b/src/jobflow_remote/cli/types.py index 48e3535d..35c43f41 100644 --- a/src/jobflow_remote/cli/types.py +++ b/src/jobflow_remote/cli/types.py @@ -34,7 +34,7 @@ db_ids_opt = Annotated[ - Optional[List[int]], + Optional[List[str]], typer.Option( "--db-id", "-did", diff --git a/src/jobflow_remote/cli/utils.py b/src/jobflow_remote/cli/utils.py index 7d5b76fa..9e0b303c 100644 --- a/src/jobflow_remote/cli/utils.py +++ b/src/jobflow_remote/cli/utils.py @@ -179,14 +179,13 @@ def loading_spinner(processing: bool = True): yield progress -def get_job_db_ids(job_db_id: str | int, job_index: int | None): - try: - db_id = int(job_db_id) - job_id = None - except ValueError: +def get_job_db_ids(job_db_id: str, job_index: int | None): + if check_valid_uuid(job_db_id, raise_on_error=False): db_id = None job_id = job_db_id - check_valid_uuid(job_id) + else: + db_id = job_db_id + job_id = None if job_index and db_id is not None: out_console.print( @@ -239,15 +238,18 @@ def wrapper(*args, **kwargs): return wrapper -def check_valid_uuid(uuid_str): +def check_valid_uuid(uuid_str, raise_on_error: bool = True) -> bool: try: uuid_obj = uuid.UUID(uuid_str) if str(uuid_obj) == uuid_str: - return + return True except ValueError: pass - raise typer.BadParameter(f"UUID {uuid_str} is in the wrong format.") + if raise_on_error: + raise typer.BadParameter(f"UUID {uuid_str} is in the wrong format.") + else: + return False def str_to_dict(string: str | None) -> dict | None: @@ -291,10 +293,10 @@ def get_start_date(start_date: datetime | None, days: int | None, hours: int | N def execute_multi_jobs_cmd( single_cmd: Callable, multi_cmd: Callable, - job_db_id: str | int | None = None, + job_db_id: str | None = None, job_index: int | None = None, job_ids: list[str] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, diff --git a/src/jobflow_remote/config/base.py b/src/jobflow_remote/config/base.py index e920bc94..d54d2406 100644 --- a/src/jobflow_remote/config/base.py +++ b/src/jobflow_remote/config/base.py @@ -128,6 +128,7 @@ class BatchConfig(BaseModel): None, description="Maximum time after which a job will not submit more jobs (seconds). To help avoid hitting the walltime", ) + model_config = ConfigDict(extra="forbid") class WorkerBase(BaseModel): @@ -385,6 +386,54 @@ class ExecutionConfig(BaseModel): model_config = ConfigDict(extra="forbid") +class QueueConfig(BaseModel): + store: dict = Field( + default_factory=dict, + description="Dictionary describing a maggma Store used for the queue data. " + "Can contain the monty serialized dictionary or a dictionary with a 'type' " + "specifying the Store subclass. Should be subclass of a MongoStore, as it " + "requires to perform MongoDB actions. The collection is used to store the " + "jobs", + validate_default=True, + ) + flows_collection: str = Field( + "flows", + description="The name of the collection containing information about the flows. " + "Taken from the same database as the one defined in the store", + ) + auxiliary_collection: str = Field( + "jf_auxiliary", + description="The name of the collection containing auxiliary information. " + "Taken from the same database as the one defined in the store", + ) + db_id_prefix: Optional[str] = Field( + None, + description="a string defining the prefix added to the integer ID associated " + "to each Job in the database", + ) + + @field_validator("store") + def check_store(cls, store: dict) -> dict: + """ + Check that the queue configuration could be converted to a Store. + """ + if store: + try: + deserialized_store = store_from_dict(store) + except Exception as e: + raise ValueError( + f"error while converting queue to a maggma store. Error: {traceback.format_exc()}" + ) from e + if not isinstance(deserialized_store, MongoStore): + raise ValueError( + "The queue store should be a subclass of a " + f"MongoStore: {deserialized_store.__class__} instead" + ) + return store + + model_config = ConfigDict(extra="forbid") + + class Project(BaseModel): """ The configurations of a Project. @@ -422,13 +471,9 @@ class Project(BaseModel): description="A dictionary with the worker name as keys and the worker " "configuration as values", ) - queue: dict = Field( - default_factory=dict, - description="Dictionary describing a maggma Store used for the queue data. " - "Can contain the monty serialized dictionary or a dictionary with a 'type' " - "specifying the Store subclass. Should be subclass of a MongoStore, as it " - "requires to perform MongoDB actions.", - validate_default=True, + queue: QueueConfig = Field( + description="The configuration of the Store used to store the states of" + "the Jobs and the Flows", ) exec_config: dict[str, ExecutionConfig] = Field( default_factory=dict, @@ -468,7 +513,7 @@ def get_queue_store(self): ------- A maggma Store """ - return store_from_dict(self.queue) + return store_from_dict(self.queue.store) def get_job_controller(self): from jobflow_remote.jobs.jobcontroller import JobController @@ -530,22 +575,6 @@ def check_jobstore(cls, jobstore: dict) -> dict: ) from e return jobstore - @field_validator("queue") - def check_queue(cls, queue: dict) -> dict: - """ - Check that the queue configuration could be converted to a Store. - """ - if queue: - try: - store = store_from_dict(queue) - except Exception as e: - raise ValueError( - f"error while converting queue to a maggma store. Error: {traceback.format_exc()}" - ) from e - if not isinstance(store, MongoStore): - raise ValueError("The queue store should be a subclass of a MongoStore") - return queue - model_config = ConfigDict(extra="forbid") diff --git a/src/jobflow_remote/config/helper.py b/src/jobflow_remote/config/helper.py index 063e1af5..68dd6863 100644 --- a/src/jobflow_remote/config/helper.py +++ b/src/jobflow_remote/config/helper.py @@ -27,7 +27,7 @@ def generate_dummy_project(name: str, full: bool = False) -> Project: workers["example_local"] = local_worker exec_config = {"example_config": generate_dummy_exec_config()} - queue = generate_dummy_queue() + queue = {"store": generate_dummy_queue()} jobstore = generate_dummy_jobstore() diff --git a/src/jobflow_remote/config/jobconfig.py b/src/jobflow_remote/config/jobconfig.py index 97452f5b..ac1e5c05 100644 --- a/src/jobflow_remote/config/jobconfig.py +++ b/src/jobflow_remote/config/jobconfig.py @@ -15,19 +15,54 @@ def set_run_config( function_filter: Callable = None, exec_config: str | ExecutionConfig | None = None, resources: dict | QResources | None = None, -): - if not exec_config and not resources: - return + worker: str | None = None, +) -> Flow | Job: + """ + Modify in place a Flow or a Job by setting the properties in the + "manager_config" entry in the JobConfig associated to each Job + matching the filter. Uses the Flow/Job update_config() method, + so follows the same conventions, also setting the options in + the config_updates of the Job, to allow setting the same properties + also in dynamically generated Jobs. + + Parameters + ---------- + flow_or_job + A Flow or a Job to be modified + name_filter + A filter for the job name. Only jobs with a matching name will be updated. + Includes partial matches, e.g. "ad" will match a job with the name "adder". + function_filter + A filter for the job function. Only jobs with a matching function will be + updated. + exec_config + The execution configuration to be added to the selected Jobs. + resources + The resources to be set for the selected Jobs. + worker + The worker where the selected Jobs will be executed. + + Returns + ------- + Flow or Job + The modified object. + """ + if not exec_config and not resources and not worker: + return flow_or_job config: dict = {"manager_config": {}} if exec_config: config["manager_config"]["exec_config"] = exec_config if resources: config["manager_config"]["resources"] = resources + if worker: + config["manager_config"]["worker"] = worker flow_or_job.update_config( config=config, name_filter=name_filter, function_filter=function_filter ) + return flow_or_job + def load_job_store(project: str | None = None) -> JobStore: """ diff --git a/src/jobflow_remote/config/manager.py b/src/jobflow_remote/config/manager.py index 25ea6bea..b233d29b 100644 --- a/src/jobflow_remote/config/manager.py +++ b/src/jobflow_remote/config/manager.py @@ -154,7 +154,10 @@ def get_project_data(self, project_name: str | None = None) -> ProjectData: project_name = self.select_project_name(project_name) if project_name not in self.projects_data: - raise ConfigError(f"The selected project {project_name} does not exist") + raise ConfigError( + f"The selected project {project_name} does not exist " + "or could not be parsed correctly" + ) return self.projects_data[project_name] diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index 8be6de2a..43d81303 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -19,7 +19,7 @@ def get_initial_job_doc_dict( job: Job, parents: Optional[list[str]], - db_id: int, + db_id: str, worker: str, exec_config: Optional[ExecutionConfig], resources: Optional[Union[dict, QResources]], @@ -121,7 +121,7 @@ class JobInfo(BaseModel): uuid: str index: int - db_id: int + db_id: str worker: str name: str state: JobState @@ -226,7 +226,7 @@ class JobDoc(BaseModel): job: Job uuid: str index: int - db_id: int + db_id: str worker: str state: JobState remote: RemoteInfo = RemoteInfo() @@ -296,7 +296,7 @@ class FlowDoc(BaseModel): # This dictionary include {job uuid: {job index: [parent's uuids]}} parents: dict[str, dict[str, list[str]]] = Field(default_factory=dict) # ids correspond to db_id, uuid, index for each JobDoc - ids: list[tuple[int, str, int]] = Field(default_factory=list) + ids: list[tuple[str, str, int]] = Field(default_factory=list) def as_db_dict(self) -> dict: """ @@ -348,7 +348,7 @@ def add_descendants(uuid): return list(descendants) @cached_property - def ids_mapping(self) -> dict[str, dict[int, int]]: + def ids_mapping(self) -> dict[str, dict[int, str]]: d: dict = defaultdict(dict) for db_id, job_id, index in self.ids: @@ -373,7 +373,7 @@ class FlowInfo(BaseModel): Mainly for visualization purposes. """ - db_ids: list[int] + db_ids: list[str] job_ids: list[str] job_indexes: list[int] flow_id: str @@ -439,7 +439,7 @@ def from_query_dict(cls, d) -> "FlowInfo": ) @cached_property - def ids_mapping(self) -> dict[str, dict[int, int]]: + def ids_mapping(self) -> dict[str, dict[int, str]]: d: dict = defaultdict(dict) for db_id, job_id, index in zip(self.db_ids, self.job_ids, self.job_indexes): diff --git a/src/jobflow_remote/jobs/graph.py b/src/jobflow_remote/jobs/graph.py index 2fadfc7b..3e8c5c83 100644 --- a/src/jobflow_remote/jobs/graph.py +++ b/src/jobflow_remote/jobs/graph.py @@ -231,7 +231,7 @@ def add_subgraph(nested_hosts_hierarchy, indent_level=0): JobState.FAILED.value: RED_COLOR, JobState.PAUSED.value: "#EAE200", JobState.STOPPED.value: RED_COLOR, - JobState.CANCELLED.value: RED_COLOR, + JobState.USER_STOPPED.value: RED_COLOR, JobState.BATCH_SUBMITTED.value: BLUE_COLOR, JobState.BATCH_RUNNING.value: BLUE_COLOR, } diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index eb37389a..5ed1a5f3 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -14,6 +14,7 @@ import pymongo from jobflow import JobStore, OnMissing from maggma.stores import MongoStore +from monty.json import MontyDecoder from monty.serialization import loadfn from qtoolkit.core.data_objects import CancelStatus, QResources @@ -119,9 +120,7 @@ def from_project_name(cls, project_name: str | None = None) -> JobController: """ config_manager: ConfigManager = ConfigManager() project: Project = config_manager.get_project(project_name) - queue_store = project.get_queue_store() - jobstore = project.get_jobstore() - return cls(queue_store=queue_store, jobstore=jobstore, project=project) + return cls.from_project(project=project) @classmethod def from_project(cls, project: Project) -> JobController: @@ -139,8 +138,16 @@ def from_project(cls, project: Project) -> JobController: An instance of JobController associated with the project. """ queue_store = project.get_queue_store() + flows_collection = project.queue.flows_collection + auxiliary_collection = project.queue.auxiliary_collection jobstore = project.get_jobstore() - return cls(queue_store=queue_store, jobstore=jobstore, project=project) + return cls( + queue_store=queue_store, + jobstore=jobstore, + flows_collection=flows_collection, + auxiliary_collection=auxiliary_collection, + project=project, + ) def close(self): """ @@ -163,7 +170,7 @@ def close(self): def _build_query_job( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, locked: bool = False, @@ -257,7 +264,7 @@ def _build_query_job( def _build_query_flow( self, job_ids: str | list[str] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | None = None, state: FlowState | None = None, start_date: datetime | None = None, @@ -360,7 +367,7 @@ def get_jobs_info_query(self, query: dict = None, **kwargs) -> list[JobInfo]: def get_jobs_info( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -449,7 +456,7 @@ def get_jobs_doc_query(self, query: dict = None, **kwargs) -> list[JobDoc]: def get_jobs_doc( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -514,7 +521,7 @@ def get_jobs_doc( @staticmethod def generate_job_id_query( - db_id: int | None = None, + db_id: str | None = None, job_id: str | None = None, job_index: int | None = None, ) -> tuple[dict, list | None]: @@ -562,7 +569,7 @@ def generate_job_id_query( def get_job_info( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, ) -> JobInfo | None: """ @@ -599,7 +606,7 @@ def _many_jobs_action( method: Callable, action_description: str, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -687,7 +694,7 @@ def _many_jobs_action( def rerun_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -765,7 +772,7 @@ def rerun_jobs( def rerun_job( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, force: bool = False, wait: int | None = None, @@ -1063,7 +1070,7 @@ def _reset_remote(self, doc: dict) -> dict: def _set_job_properties( self, values: dict, - db_id: int | None = None, + db_id: str | None = None, job_id: str | None = None, job_index: int | None = None, wait: int | None = None, @@ -1135,7 +1142,7 @@ def set_job_state( self, state: JobState, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, @@ -1190,7 +1197,7 @@ def set_job_state( def retry_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -1264,7 +1271,7 @@ def retry_jobs( def retry_job( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, @@ -1347,7 +1354,7 @@ def retry_job( def pause_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -1412,10 +1419,10 @@ def pause_jobs( wait=wait, ) - def cancel_jobs( + def stop_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -1427,8 +1434,8 @@ def cancel_jobs( break_lock: bool = False, ) -> list[int]: """ - Cancel selected Jobs. Only Jobs in the READY and all the running states - can be cancelled. + Stop selected Jobs. Only Jobs in the READY and all the running states + can be stopped. The action is not reversible. Parameters @@ -1472,8 +1479,8 @@ def cancel_jobs( List of db_ids of the updated Jobs. """ return self._many_jobs_action( - method=self.cancel_job, - action_description="cancelling", + method=self.stop_job, + action_description="stopping", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, @@ -1487,17 +1494,17 @@ def cancel_jobs( break_lock=break_lock, ) - def cancel_job( + def stop_job( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, ) -> list[int]: """ - Cancel a single Job. Only Jobs in the READY and all the running states - can be cancelled. + Stop a single Job. Only Jobs in the READY and all the running states + can be stopped. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. The action is not reversible. @@ -1549,18 +1556,22 @@ def cancel_job( f"Failed cancelling the process for Job {job_doc['uuid']} {job_doc['index']}", exc_info=True, ) - updated_states = {job_id: {job_index: JobState.CANCELLED}} + job_id = job_doc["uuid"] + job_index = job_doc["index"] + updated_states = {job_id: {job_index: JobState.USER_STOPPED}} self.update_flow_state( flow_uuid=flow_lock.locked_document["uuid"], updated_states=updated_states, ) - job_lock.update_on_release = {"$set": {"state": JobState.CANCELLED.value}} + job_lock.update_on_release = { + "$set": {"state": JobState.USER_STOPPED.value} + } return [job_lock.locked_document["db_id"]] def pause_job( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, wait: int | None = None, ) -> list[int]: @@ -1600,6 +1611,9 @@ def pause_job( job_lock_kwargs=job_lock_kwargs, flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): + job_doc = job_lock.locked_document + job_id = job_doc["uuid"] + job_index = job_doc["index"] updated_states = {job_id: {job_index: JobState.PAUSED}} self.update_flow_state( flow_uuid=flow_lock.locked_document["uuid"], @@ -1611,7 +1625,7 @@ def pause_job( def play_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -1684,7 +1698,7 @@ def play_jobs( def play_job( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, @@ -1731,6 +1745,8 @@ def play_job( flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document + job_id = job_doc["uuid"] + job_index = job_doc["index"] on_missing = job_doc["job"]["config"]["on_missing_references"] allow_failed = on_missing != OnMissing.ERROR.value @@ -1764,7 +1780,7 @@ def set_job_run_properties( resources: dict | QResources | None = None, update: bool = True, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -1924,7 +1940,7 @@ def get_flow_job_aggreg( def get_flows_info( self, job_ids: str | list[str] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | None = None, state: FlowState | None = None, start_date: datetime | None = None, @@ -2070,7 +2086,7 @@ def delete_flow(self, flow_id: str, delete_output: bool = False): def remove_lock_job( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, start_date: datetime | None = None, @@ -2133,7 +2149,7 @@ def remove_lock_job( def remove_lock_flow( self, job_ids: str | list[str] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | None = None, state: FlowState | None = None, start_date: datetime | None = None, @@ -2318,7 +2334,7 @@ def get_job_info_by_job_uuid( def get_job_doc( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, ) -> JobDoc | None: query, sort = self.generate_job_id_query(db_id, job_id, job_index) @@ -2336,7 +2352,7 @@ def count_jobs( self, query: dict | None = None, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, state: JobState | None = None, locked: bool = False, @@ -2363,7 +2379,7 @@ def count_flows( self, query: dict | None = None, job_ids: str | list[str] | None = None, - db_ids: int | list[int] | None = None, + db_ids: str | list[str] | None = None, flow_ids: str | None = None, state: FlowState | None = None, start_date: datetime | None = None, @@ -2398,7 +2414,7 @@ def add_flow( allow_external_references: bool = False, exec_config: ExecutionConfig | None = None, resources: dict | QResources | None = None, - ) -> list[int]: + ) -> list[str]: from jobflow.core.flow import get_flow flow = get_flow(flow, allow_external_references=allow_external_references) @@ -2417,7 +2433,11 @@ def add_flow( ) first_id = doc_next_id["next_id"] db_ids = [] - for (job, parents), db_id in zip(jobs_list, range(first_id, first_id + n_jobs)): + for (job, parents), db_id_int in zip( + jobs_list, range(first_id, first_id + n_jobs) + ): + prefix = self.project.queue.db_id_prefix or "" + db_id = f"{prefix}{db_id_int}" db_ids.append(db_id) job_dicts.append( get_initial_job_doc_dict( @@ -2445,23 +2465,53 @@ def add_flow( def _append_flow( self, - job_doc: JobDoc, + job_doc: dict, flow_dict: dict, - new_flow: jobflow.Flow | jobflow.Job | list[jobflow.Job], + new_flow_dict: dict, worker: str, response_type: DynamicResponseType, exec_config: ExecutionConfig | None = None, resources: QResources | None = None, ): - from jobflow.core.flow import get_flow + from jobflow import Flow, Job + + decoder = MontyDecoder() + + def deserialize_partial_flow(in_dict: dict): + """ + Recursively deserialize a Flow dictionary, avoiding the deserialization + of all the elements that may require external packages. + """ + if in_dict.get("@class", None) == "Flow": + jobs = [deserialize_partial_flow(d) for d in in_dict.get("jobs")] + flow_init = { + k: v + for k, v in in_dict.items() + if k not in ("@module", "@class", "@version", "job") + } + flow_init["jobs"] = jobs + return Flow(**flow_init) + # if it is not a Flow, should be a Job + job_init = { + k: v + for k, v in in_dict.items() + if k not in ("@module", "@class", "@version") + } + job_init["config"] = decoder.process_decoded(job_init["config"]) + return Job(**job_init) - new_flow = get_flow(new_flow, allow_external_references=True) + # It is sure that the new_flow_dict is a serialized Flow (and not Job + # or list[Job]), because the get_flow has already been applied at run + # time, during the remote execution. + # Recursive deserialize the Flow without deserializing function and + # arguments to take advantage of standard Flow/Job methods. + new_flow = deserialize_partial_flow(new_flow_dict) # get job parents if response_type == DynamicResponseType.REPLACE: - job_parents = job_doc.parents + job_parents = job_doc["parents"] else: - job_parents = [(job_doc.uuid, job_doc.index)] + job_parents = [(job_doc["uuid"], job_doc["index"])] # add new jobs to flow flow_dict = dict(flow_dict) @@ -2478,9 +2528,11 @@ def _append_flow( job_dicts = [] flow_updates["$set"] = {} ids_to_push = [] - for (job, parents), db_id in zip( + for (job, parents), db_id_int in zip( jobs_list, range(first_id, first_id + n_new_jobs) ): + prefix = self.project.queue.db_id_prefix or "" + db_id = f"{prefix}{db_id_int}" # inherit the parents of the job to which we are appending parents = parents if parents else job_parents job_dicts.append( @@ -2501,7 +2553,8 @@ def _append_flow( # if detour, update the parents of the child jobs leaf_uuids = [v for v, d in new_flow.graph.out_degree() if d == 0] self.jobs.update_many( - {"parents": job_doc.uuid}, {"$push": {"parents": {"$each": leaf_uuids}}} + {"parents": job_doc["uuid"]}, + {"$push": {"parents": {"$each": leaf_uuids}}}, ) # flow_dict["updated_on"] = datetime.utcnow() @@ -2594,7 +2647,7 @@ def checkout_job( # TODO if jobstore is not an option anymore, the "store" argument # can be removed and just use self.jobstore. def complete_job( - self, job_doc: JobDoc, local_path: Path | str, store: JobStore + self, job_doc: dict, local_path: Path | str, store: JobStore ) -> bool: # Don't sleep if the flow is locked. Only the Runner should call this, # and it will handle the fact of having a locked Flow. @@ -2602,11 +2655,12 @@ def complete_job( # avoids parsing (potentially large) files to discover that the flow is # already locked. with self.lock_flow( - filter={"jobs": job_doc.uuid}, get_locked_doc=True + filter={"jobs": job_doc["uuid"]}, get_locked_doc=True ) as flow_lock: if flow_lock.locked_document: local_path = Path(local_path) out_path = local_path / OUT_FILENAME + host_flow_id = job_doc["job"]["hosts"][-1] if not out_path.exists(): msg = ( f"The output file {OUT_FILENAME} was not present in the download " @@ -2615,13 +2669,16 @@ def complete_job( self.checkin_job( job_doc, flow_lock.locked_document, response=None, error=msg ) - self.update_flow_state(job_doc.job.hosts[-1]) + self.update_flow_state(host_flow_id) return True - out = loadfn(out_path) - doc_update = {"start_time": out["start_time"]} + # do not deserialize the response or stored data, saves time and + # avoids the need for packages to be installed. + out = loadfn(out_path, cls=None) + decoder = MontyDecoder() + doc_update = {"start_time": decoder.process_decoded(out["start_time"])} # update the time of the JobDoc, will be used in the checkin - end_time = out.get("end_time") + end_time = decoder.process_decoded(out.get("end_time")) if end_time: doc_update["end_time"] = end_time @@ -2634,7 +2691,7 @@ def complete_job( error=error, doc_update=doc_update, ) - self.update_flow_state(job_doc.job.hosts[-1]) + self.update_flow_state(host_flow_id) return True response = out.get("response") @@ -2651,24 +2708,20 @@ def complete_job( error=msg, doc_update=doc_update, ) - self.update_flow_state(job_doc.job.hosts[-1]) + self.update_flow_state(host_flow_id) return True - save = { - k: "output" if v is True else v - for k, v in job_doc.job._kwargs.items() - } - remote_store = get_remote_store(store, local_path) - remote_store.connect() - update_store(store, remote_store, save, job_doc.db_id) + + update_store(store, remote_store, job_doc["db_id"]) + self.checkin_job( job_doc, flow_lock.locked_document, response=response, doc_update=doc_update, ) - self.update_flow_state(job_doc.job.hosts[-1]) + self.update_flow_state(host_flow_id) return True elif flow_lock.unavailable_document: # raising the error if the lock could not be acquired leaves @@ -2682,9 +2735,9 @@ def complete_job( def checkin_job( self, - job_doc: JobDoc, + job_doc: dict, flow_dict: dict, - response: jobflow.Response | None, + response: dict | None, error: str | None = None, doc_update: dict | None = None, ): @@ -2694,51 +2747,47 @@ def checkin_job( # handle response else: new_state = JobState.COMPLETED.value - if response.replace is not None: + if response["replace"] is not None: self._append_flow( job_doc, flow_dict, - response.replace, + response["replace"], response_type=DynamicResponseType.REPLACE, - worker=job_doc.worker, - exec_config=job_doc.exec_config, - resources=job_doc.resources, + worker=job_doc["worker"], + exec_config=job_doc["exec_config"], + resources=job_doc["resources"], ) - if response.addition is not None: + if response["addition"] is not None: self._append_flow( job_doc, flow_dict, - response.addition, + response["addition"], response_type=DynamicResponseType.ADDITION, - worker=job_doc.worker, - exec_config=job_doc.exec_config, - resources=job_doc.resources, + worker=job_doc["worker"], + exec_config=job_doc["exec_config"], + resources=job_doc["resources"], ) - if response.detour is not None: + if response["detour"] is not None: self._append_flow( job_doc, flow_dict, - response.detour, + response["detour"], response_type=DynamicResponseType.DETOUR, - worker=job_doc.worker, - exec_config=job_doc.exec_config, - resources=job_doc.resources, + worker=job_doc["worker"], + exec_config=job_doc["exec_config"], + resources=job_doc["resources"], ) - if response.stored_data is not None: - from monty.json import jsanitize - - stored_data = jsanitize( - response.stored_data, strict=True, enum_values=True - ) + if response["stored_data"] is not None: + stored_data = response["stored_data"] - if response.stop_children: - self.stop_children(job_doc.uuid) + if response["stop_children"]: + self.stop_children(job_doc["uuid"]) - if response.stop_jobflow: - self.stop_jobflow(job_uuid=job_doc.uuid) + if response["stop_jobflow"]: + self.stop_jobflow(job_uuid=job_doc["uuid"]) if not doc_update: doc_update = {} @@ -2747,16 +2796,16 @@ def checkin_job( ) result = self.jobs.update_one( - {"uuid": job_doc.uuid, "index": job_doc.index}, {"$set": doc_update} + {"uuid": job_doc["uuid"], "index": job_doc["index"]}, {"$set": doc_update} ) if result.modified_count == 0: raise RuntimeError( - f"The job {job_doc.uuid} index {job_doc.index} has not been updated in the database" + f"The job {job_doc['uuid']} index {job_doc['index']} has not been updated in the database" ) # TODO it should be fine to replace this query by constructing the list of # job uuids from the original + those added. Should be verified. - job_uuids = self.get_flow_info_by_job_uuid(job_doc.uuid, ["jobs"])["jobs"] + job_uuids = self.get_flow_info_by_job_uuid(job_doc["uuid"], ["jobs"])["jobs"] return len(self.refresh_children(job_uuids)) + 1 # TODO should this refresh all the kind of states? Or just set to ready? @@ -2794,7 +2843,9 @@ def refresh_children(self, job_uuids: list[str]) -> list[int]: job.get("job", {}).get("config", {}).get("on_missing_references", None) ) if on_missing_ref == jobflow.OnMissing.NONE.value: - allowed_states.extend((JobState.FAILED.value, JobState.CANCELLED.value)) + allowed_states.extend( + (JobState.FAILED.value, JobState.USER_STOPPED.value) + ) if job["state"] == JobState.WAITING.value and all( [jobs_mapping[p]["state"] in allowed_states for p in job["parents"]] ): @@ -3118,7 +3169,7 @@ def lock_job_for_update( def lock_job_flow( self, job_id: str | None = None, - db_id: int | None = None, + db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, diff --git a/src/jobflow_remote/jobs/run.py b/src/jobflow_remote/jobs/run.py index aa391bf2..9f1282e6 100644 --- a/src/jobflow_remote/jobs/run.py +++ b/src/jobflow_remote/jobs/run.py @@ -10,6 +10,7 @@ from pathlib import Path from jobflow import JobStore, initialize_logger +from jobflow.core.flow import get_flow from jobflow.core.job import Job from monty.os import cd from monty.serialization import dumpfn, loadfn @@ -62,6 +63,17 @@ def run_remote_job(run_dir: str | Path = "."): # The output of the response has already been stored in the store. response.output = None + + # Convert to Flow the dynamic responses before dumping the output. + # This is required so that the response does not need to be + # deserialized and converted by to Flows by the runner. + if response.addition: + response.addition = get_flow(response.addition) + if response.detour: + response.detour = get_flow(response.detour) + if response.replace: + response.replace = get_flow(response.replace) + output = { "response": response, "error": error, diff --git a/src/jobflow_remote/jobs/runner.py b/src/jobflow_remote/jobs/runner.py index 3fa9ff13..ba57209b 100644 --- a/src/jobflow_remote/jobs/runner.py +++ b/src/jobflow_remote/jobs/runner.py @@ -15,6 +15,7 @@ from typing import TYPE_CHECKING from jobflow.utils import suuid +from monty.json import MontyDecoder from monty.os import makedirs_p from qtoolkit.core.data_objects import QState, SubmissionStatus @@ -29,13 +30,14 @@ ) from jobflow_remote.config.manager import ConfigManager from jobflow_remote.jobs.batch import RemoteBatchManager -from jobflow_remote.jobs.data import IN_FILENAME, OUT_FILENAME, JobDoc, RemoteError +from jobflow_remote.jobs.data import IN_FILENAME, OUT_FILENAME, RemoteError from jobflow_remote.jobs.state import JobState from jobflow_remote.remote.data import ( get_job_path, get_remote_in_file, get_remote_store, get_remote_store_filenames, + resolve_job_dict_args, ) from jobflow_remote.remote.host import BaseHost from jobflow_remote.remote.queue import ERR_FNAME, OUT_FNAME, QueueManager, set_name_out @@ -381,24 +383,23 @@ def upload(self, lock: MongoLock): db_id = doc["db_id"] logger.debug(f"upload db_id: {db_id}") - job_doc = JobDoc(**doc) - job = job_doc.job + job_dict = doc["job"] - worker = self.get_worker(job_doc.worker) - host = self.get_host(job_doc.worker) + worker = self.get_worker(doc["worker"]) + host = self.get_host(doc["worker"]) store = self.jobstore # TODO would it be better/feasible to keep a pool of the required # Stores already connected, to avoid opening and closing them? store.connect() try: - job.resolve_args(store=store, inplace=True) + resolve_job_dict_args(job_dict, store) finally: try: store.close() except Exception: logging.error(f"error while closing the store {store}", exc_info=True) - remote_path = get_job_path(job.uuid, job.index, worker.work_dir) + remote_path = get_job_path(job_dict["uuid"], job_dict["index"], worker.work_dir) # Set the value of the original store for dynamical workflow. Usually it # will be None don't add the serializer, at this stage the default_orjson @@ -416,7 +417,7 @@ def upload(self, lock: MongoLock): logger.error(err_msg) raise RemoteError(err_msg, no_retry=False) - serialized_input = get_remote_in_file(job, remote_store) + serialized_input = get_remote_in_file(job_dict, remote_store) path_file = Path(remote_path, IN_FILENAME) host.put(serialized_input, str(path_file)) @@ -439,34 +440,35 @@ def submit(self, lock: MongoLock): doc = lock.locked_document logger.debug(f"submit db_id: {doc['db_id']}") - job_doc = JobDoc(**doc) - job = job_doc.job + job_dict = doc["job"] - worker = self.get_worker(job_doc.worker) + worker_name = doc["worker"] + worker = self.get_worker(worker_name) - remote_path = Path(job_doc.run_dir) + remote_path = Path(doc["run_dir"]) script_commands = [f"jf execution run {remote_path}"] - queue_manager = self.get_queue_manager(job_doc.worker) + queue_manager = self.get_queue_manager(worker_name) qout_fpath = remote_path / OUT_FNAME qerr_fpath = remote_path / ERR_FNAME - exec_config = job_doc.exec_config + exec_config = doc["exec_config"] if isinstance(exec_config, str): exec_config = self.config_manager.get_exec_config( exec_config_name=exec_config, project_name=self.project_name ) elif isinstance(exec_config, dict): - exec_config = ExecutionConfig.parse_obj(job_doc.exec_config) + exec_config = ExecutionConfig.parse_obj(exec_config) # define an empty default if it is not set exec_config = exec_config or ExecutionConfig() - if job_doc.worker in self.batch_workers: + if worker_name in self.batch_workers: resources: dict = {} + set_name_out( - resources, job.name, out_fpath=qout_fpath, err_fpath=qerr_fpath + resources, job_dict["name"], out_fpath=qout_fpath, err_fpath=qerr_fpath ) shell_manager = queue_manager.get_shell_manager() shell_manager.write_submission_script( @@ -480,8 +482,8 @@ def submit(self, lock: MongoLock): create_submit_dir=False, ) - self.batch_workers[job_doc.worker].submit_job( - job_id=job_doc.uuid, index=job_doc.index + self.batch_workers[worker_name].submit_job( + job_id=doc["uuid"], index=doc["index"] ) lock.update_on_release = { "$set": { @@ -489,9 +491,14 @@ def submit(self, lock: MongoLock): } } else: - resources = job_doc.resources or worker.resources or {} + # decode in case it contains a QResources. It was not deserialized before. + resources = ( + MontyDecoder().process_decoded(doc["resources"]) + or worker.resources + or {} + ) set_name_out( - resources, job.name, out_fpath=qout_fpath, err_fpath=qerr_fpath + resources, job_dict["name"], out_fpath=qout_fpath, err_fpath=qerr_fpath ) pre_run = worker.pre_run or "" @@ -525,8 +532,8 @@ def submit(self, lock: MongoLock): "state": JobState.SUBMITTED.value, } } - if job_doc.worker in self.limited_workers: - self.limited_workers[job_doc.worker]["current"] += 1 + if worker_name in self.limited_workers: + self.limited_workers[worker_name]["current"] += 1 else: raise RemoteError( f"unhandled submission status {submit_result.status}", True @@ -545,20 +552,22 @@ def download(self, lock): doc = lock.locked_document logger.debug(f"download db_id: {doc['db_id']}") - job_doc = JobDoc(**doc) - job = job_doc.job + # job_doc = JobDoc(**doc) + job_dict = doc["job"] # If the worker is local do not copy the files in the temporary folder # It should not arrive to this point, since it should go directly # from SUBMITTED/RUNNING to DOWNLOADED in case of local worker - worker = self.get_worker(job_doc.worker) + worker = self.get_worker(doc["worker"]) if not worker.is_local: - host = self.get_host(job_doc.worker) + host = self.get_host(doc["worker"]) store = self.jobstore - remote_path = job_doc.run_dir + remote_path = doc["run_dir"] local_base_dir = Path(self.project.tmp_dir, "download") - local_path = get_job_path(job.uuid, job.index, local_base_dir) + local_path = get_job_path( + job_dict["uuid"], job_dict["index"], local_base_dir + ) makedirs_p(local_path) @@ -573,9 +582,7 @@ def download(self, lock): host.get(remote_file_path, str(Path(local_path, fname))) except FileNotFoundError: # if files are missing it should not retry - err_msg = ( - f"file {remote_file_path} for job {job.uuid} does not exist" - ) + err_msg = f"file {remote_file_path} for job {job_dict['uuid']} does not exist" logger.error(err_msg) raise RemoteError(err_msg, True) @@ -604,9 +611,8 @@ def complete_job(self, lock): local_path = get_job_path(doc["uuid"], doc["index"], local_base_dir) try: - job_doc = JobDoc(**doc) store = self.jobstore - completed = self.job_controller.complete_job(job_doc, local_path, store) + completed = self.job_controller.complete_job(doc, local_path, store) except json.JSONDecodeError: # if an empty file is copied this error can appear, do not retry diff --git a/src/jobflow_remote/jobs/state.py b/src/jobflow_remote/jobs/state.py index 4d84188a..86c2f7cf 100644 --- a/src/jobflow_remote/jobs/state.py +++ b/src/jobflow_remote/jobs/state.py @@ -21,7 +21,7 @@ class JobState(Enum): FAILED = "FAILED" PAUSED = "PAUSED" STOPPED = "STOPPED" - CANCELLED = "CANCELLED" + USER_STOPPED = "USER_STOPPED" BATCH_SUBMITTED = "BATCH_SUBMITTED" BATCH_RUNNING = "BATCH_RUNNING" @@ -44,7 +44,7 @@ def short_value(self) -> str: JobState.FAILED: "F", JobState.PAUSED: "P", JobState.STOPPED: "ST", - JobState.CANCELLED: "CA", + JobState.USER_STOPPED: "CA", JobState.BATCH_SUBMITTED: "BS", JobState.BATCH_RUNNING: "BR", } @@ -85,7 +85,6 @@ class FlowState(Enum): FAILED = "FAILED" PAUSED = "PAUSED" STOPPED = "STOPPED" - CANCELLED = "CANCELLED" @classmethod def from_jobs_states( @@ -122,10 +121,8 @@ def from_jobs_states( # when applying the change in the remote state. elif any(js == JobState.FAILED for js in jobs_states): return cls.FAILED - elif any(js == JobState.STOPPED for js in jobs_states): + elif any(js in (JobState.STOPPED, JobState.USER_STOPPED) for js in jobs_states): return cls.STOPPED - elif any(js == JobState.CANCELLED for js in jobs_states): - return cls.CANCELLED elif any(js == JobState.PAUSED for js in jobs_states): return cls.PAUSED else: diff --git a/src/jobflow_remote/remote/data.py b/src/jobflow_remote/remote/data.py index 6240a163..15b80059 100644 --- a/src/jobflow_remote/remote/data.py +++ b/src/jobflow_remote/remote/data.py @@ -80,25 +80,89 @@ def get_remote_store_filenames(store: JobStore) -> list[str]: return filenames -def update_store(store, remote_store, save, db_id): - # TODO is it correct? - data = list(remote_store.query(load=save)) - if len(data) > 1: - raise RuntimeError("something wrong with the remote store") - - # Set the db_id here and not directly in the Job's metadata to avoid - # that it gets passed down to its children/replacements. - if "db_id" not in data[0]["metadata"]: - data[0]["metadata"]["db_id"] = db_id - - store.connect() +def update_store(store: JobStore, remote_store: JobStore, db_id: int): try: - for d in data: - data = dict(d) - data.pop("_id") - store.update(data, key=["uuid", "index"], save=save) + store.connect() + remote_store.connect() + + additional_stores = set(store.additional_stores.keys()) + additional_remote_stores = set(remote_store.additional_stores.keys()) + + # This checks that the additional stores in the two stores match correctly. + # It should not happen if not because of a bug, so the check could maybe be + # removed + if additional_stores ^ additional_remote_stores: + raise ValueError( + f"The additional stores in the local and remote JobStore do not " + f"match: {additional_stores ^ additional_remote_stores}" + ) + + # copy the data store by store, not using directly the JobStore. + # This avoids the need to deserialize the store content and the "save" + # argument. + for add_store_name, remote_add_store in remote_store.additional_stores.items(): + add_store = store.additional_stores[add_store_name] + + for d in remote_add_store.query(): + data = dict(d) + data.pop("_id", None) + add_store.update(data) + main_docs_list = list(remote_store.docs_store.query({})) + if len(main_docs_list) > 1: + raise RuntimeError( + "The downloaded output store contains more than one document" + ) + main_doc = main_docs_list[0] + main_doc.pop("_id", None) + # Set the db_id here and not directly in the Job's metadata to prevent + # it from being propagated to its children/replacements. + if "db_id" not in main_doc["metadata"]: + main_doc["metadata"]["db_id"] = db_id + store.docs_store.update(main_doc, key=["uuid", "index"]) finally: try: store.close() except Exception: logging.error(f"error while closing the store {store}", exc_info=True) + try: + remote_store.close() + except Exception: + logging.error( + f"error while closing the remote store {remote_store}", exc_info=True + ) + + +def resolve_job_dict_args(job_dict: dict, store: JobStore) -> dict: + """ + Resolve the references in a serialized Job. + + Similar to Job.resolve_args, but without the need to deserialize the Job. + The references are resolved inplace. + + Parameters + ---------- + job_dict + The serialized version of a Job. + store + The JobStore from where the references should be resolved. + + Returns + ------- + The updated version of the input dictionary with references resolved. + """ + from jobflow.core.reference import OnMissing, find_and_resolve_references + + on_missing = OnMissing(job_dict["config"]["on_missing_references"]) + cache: dict[str, Any] = {} + resolved_args = find_and_resolve_references( + job_dict["function_args"], store, cache=cache, on_missing=on_missing + ) + resolved_kwargs = find_and_resolve_references( + job_dict["function_kwargs"], store, cache=cache, on_missing=on_missing + ) + resolved_args = tuple(resolved_args) + + # substitution is in place + job_dict["function_args"] = resolved_args + job_dict["function_kwargs"] = resolved_kwargs + return job_dict