Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests and updates #92

Merged
merged 6 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ jobs:
run: pre-commit run --all-files --show-diff-on-failure

test:
services:
local_mongodb:
image: mongo:4.4
ports:
- 27017:27017

runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand All @@ -57,10 +63,10 @@ jobs:
pip install .[tests]

- name: Unit tests
run: pytest --cov=jobflow_remote --cov-report=xml --ignore tests/integration
run: pytest --cov=jobflow_remote --cov-report=xml --cov-config pyproject.toml --ignore tests/integration

- name: Integration tests
run: pytest --cov=jobflow_remote --cov-append --cov-report=xml tests/integration
run: pytest --cov=jobflow_remote --cov-append --cov-report=xml --cov-config pyproject.toml tests/integration

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
Expand Down
2 changes: 1 addition & 1 deletion doc/source/_static/project_schema.html

Large diffs are not rendered by default.

67 changes: 64 additions & 3 deletions src/jobflow_remote/cli/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from jobflow_remote.cli.types import (
db_ids_opt,
end_date_opt,
flow_ids_opt,
flow_state_opt,
force_opt,
job_ids_indexes_opt,
job_state_opt,
Expand Down Expand Up @@ -158,17 +160,76 @@ def unlock(
if not confirmed:
raise typer.Exit(0)

with loading_spinner(False) as progress:
progress.add_task(description="Unlocking jobs...", total=None)

num_unlocked = jc.unlock_jobs(
job_ids=job_id,
db_ids=db_id,
state=state,
start_date=start_date,
end_date=end_date,
)

out_console.print(f"{num_unlocked} jobs were unlocked")


@app_admin.command()
def unlock_flow(
job_id: job_ids_indexes_opt = None,
db_id: db_ids_opt = None,
flow_id: flow_ids_opt = None,
state: flow_state_opt = None,
start_date: start_date_opt = None,
end_date: end_date_opt = None,
force: force_opt = False,
):
"""
Forcibly removes the lock from the documents of the selected jobs.
WARNING: can lead to inconsistencies if the processes is actually running
"""

job_ids_indexes = get_job_ids_indexes(job_id)

jc = get_job_controller()

if not force:
with loading_spinner(False) as progress:
progress.add_task(
description="Checking the number of locked documents...", total=None
)

num_unlocked = jc.remove_lock_job(
job_ids=job_id,
flows_info = jc.get_flows_info(
job_ids=job_ids_indexes,
db_ids=db_id,
flow_ids=flow_id,
state=state,
start_date=start_date,
locked=True,
end_date=end_date,
)

out_console.print(f"{num_unlocked} jobs were unlocked")
if not flows_info:
exit_with_error_msg("No data matching the request")

text = Text.from_markup(
f"[red]This operation will [bold]remove the lock[/bold] for (roughly) [bold]{len(flows_info)} Flow(s)[/bold]. Proceed anyway?[/red]"
)
confirmed = Confirm.ask(text, default=False)

if not confirmed:
raise typer.Exit(0)

with loading_spinner(False) as progress:
progress.add_task(description="Unlocking flows...", total=None)

num_unlocked = jc.unlock_flows(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
start_date=start_date,
end_date=end_date,
)

out_console.print(f"{num_unlocked} flows were unlocked")
2 changes: 1 addition & 1 deletion src/jobflow_remote/cli/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def format_flow_info(flow_info: FlowInfo):
table.title_style = "bold"
table.add_column("DB id")
table.add_column("Name")
table.add_column("State [Remote]")
table.add_column("State")
table.add_column("Job id (Index)")
table.add_column("Worker")

Expand Down
6 changes: 0 additions & 6 deletions src/jobflow_remote/cli/jf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from jobflow_remote.cli.utils import (
cleanup_job_controller,
complete_profiling,
exit_with_error_msg,
get_config_manager,
initialize_config_manager,
out_console,
Expand Down Expand Up @@ -87,11 +86,6 @@ def main(
initialize_config_manager()
cm = get_config_manager()
if project:
if project not in cm.projects_data:
exit_with_error_msg(
f"Project {project} is not defined in {SETTINGS.projects_folder}"
)

SETTINGS.project = project

try:
Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def set_state(
)

if not succeeded:
exit_with_error_msg("Could not reset the remote attempts")
exit_with_error_msg("Could not change the job state")

print_success_msg()

Expand Down Expand Up @@ -675,7 +675,7 @@ def exec_config(
hours=hours,
verbosity=verbosity,
raise_on_error=raise_on_error,
exec_config_value=exec_config_value,
exec_config=exec_config_value,
)


Expand Down
2 changes: 1 addition & 1 deletion src/jobflow_remote/cli/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def check(
"""
check_incompatible_opt({"jobstore": jobstore, "queue": queue, "worker": worker})

cm = ConfigManager(warn=True)
cm = get_config_manager()
project = cm.get_project()

check_all = all(not v for v in (jobstore, worker, queue))
Expand Down
6 changes: 5 additions & 1 deletion src/jobflow_remote/cli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ def run(
if not (transfer or complete or queue or checkout):
transfer = complete = queue = checkout = True

runner.run(transfer=transfer, complete=complete, queue=queue, checkout=checkout)
try:
runner.run(transfer=transfer, complete=complete, queue=queue, checkout=checkout)
finally:
runner.cleanup()


@app_runner.command()
Expand Down Expand Up @@ -261,6 +264,7 @@ def status():
DaemonStatus.STOPPING: "gold1",
DaemonStatus.SHUT_DOWN: "red",
DaemonStatus.PARTIALLY_RUNNING: "gold1",
DaemonStatus.STARTING: "gold1",
DaemonStatus.RUNNING: "green",
}[current_status]
text = Text()
Expand Down
3 changes: 3 additions & 0 deletions src/jobflow_remote/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def cleanup_job_controller():
global _shared_job_controller
if _shared_job_controller is not None:
_shared_job_controller.close()
# set to None again, in case it needs to be used again in the same
# execution (e.g., in tests)
_shared_job_controller = None


def start_profiling():
Expand Down
13 changes: 10 additions & 3 deletions src/jobflow_remote/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ class RunnerOptions(BaseModel):
delay_refresh_limited: int = Field(
600,
description="Delay between subsequent refresh from the DB of the number of submitted "
"and running jobs (seconds). Only use if a worker with max_jobs is present",
"and running jobs (seconds). Only used if a worker with max_jobs is present",
)
delay_update_batch: int = Field(
60,
description="Delay between subsequent refresh from the DB of the number of submitted "
"and running jobs (seconds). Only use if a worker with max_jobs is present",
"and running jobs (seconds). Only used if a batch worker is present",
)
lock_timeout: Optional[int] = Field(
86400,
Expand Down Expand Up @@ -111,6 +111,13 @@ def to_logging(self) -> int:


class BatchConfig(BaseModel):
"""
Configuration for execution of batch jobs.

Allows to execute multiple Jobs in a single process executed on the
worker (e.g. SLURM job).
"""

jobs_handle_dir: Path = Field(
description="Absolute path to a folder that will be used to store information to share with the jobs being executed"
)
Expand Down Expand Up @@ -524,7 +531,7 @@ class Project(BaseModel):
)
queue: QueueConfig = Field(
description="The configuration of the Store used to store the states of"
"the Jobs and the Flows",
" the Jobs and the Flows",
)
exec_config: dict[str, ExecutionConfig] = Field(
default_factory=dict,
Expand Down
2 changes: 1 addition & 1 deletion src/jobflow_remote/config/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def load_projects_data(self) -> dict[str, ProjectData]:

projects_data: dict[str, ProjectData] = {}
for ext in self.projects_ext:
for filepath in glob.glob(str(self.projects_folder / f"*.{ext}")):
for filepath in self.projects_folder.glob(str(f"*.{ext}")):
try:
if ext in ["json", "yaml"]:
d = loadfn(filepath)
Expand Down
37 changes: 30 additions & 7 deletions src/jobflow_remote/jobs/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class DaemonStatus(Enum):
STOPPED = "STOPPED"
STOPPING = "STOPPING"
PARTIALLY_RUNNING = "PARTIALLY_RUNNING"
STARTING = "STARTING"
RUNNING = "RUNNING"


Expand Down Expand Up @@ -224,7 +225,7 @@ def check_supervisord_process(self) -> bool:
logger.warning(
f"Process with pid {pid} is not running but daemon files are present. Cleaning them up."
)
self.clean_files()
self.clean_files()

return running

Expand Down Expand Up @@ -258,7 +259,10 @@ def check_status(self) -> DaemonStatus:
)

if all(pi.get("state") in RUNNING_STATES for pi in proc_info):
return DaemonStatus.RUNNING
if any(pi.get("state") == ProcessStates.STARTING for pi in proc_info):
return DaemonStatus.STARTING
else:
return DaemonStatus.RUNNING

if any(pi.get("state") in RUNNING_STATES for pi in proc_info):
return DaemonStatus.PARTIALLY_RUNNING
Expand Down Expand Up @@ -483,12 +487,31 @@ def _verify_call_result(
return None

def kill(self, raise_on_error: bool = False) -> bool:
status = self.check_status()
if status == DaemonStatus.SHUT_DOWN:
logger.info("supervisord is not running. No process is running")
return True
# If the daemon is shutting down supervisord may not be able to identify
# the state. Try proceeding in that case, since we really want to kill
# the process
status = None
try:
status = self.check_status()
if status == DaemonStatus.SHUT_DOWN:
logger.info("supervisord is not running. No process is running")
return True
if status == DaemonStatus.STOPPED:
logger.info("Processes are already stopped.")
return True
except DaemonError as e:
msg = (
f"Error while determining the state of the runner: {getattr(e, 'message', str(e))}."
f"Proceeding with the kill command."
)
logger.warning(msg)

if status in (DaemonStatus.RUNNING, DaemonStatus.STOPPING):
if status in (
None,
DaemonStatus.RUNNING,
DaemonStatus.STOPPING,
DaemonStatus.PARTIALLY_RUNNING,
):
interface = self.get_interface()
result = interface.supervisor.signalAllProcesses(9)
error = self._verify_call_result(result, "kill", raise_on_error)
Expand Down
Loading