Skip to content

Commit

Permalink
feat(shutdown): stop all running jobs before stopping workflow (reana…
Browse files Browse the repository at this point in the history
…hub#423)

Make sure that all the running jobs are stopped before stopping the
run-batch pod, as otherwise they will continue running and they will not
be cleaned up.

Closes reanahub/reana-workflow-controller#546
  • Loading branch information
mdonadoni committed Feb 1, 2024
1 parent 1fc117e commit 866675b
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 4 deletions.
49 changes: 48 additions & 1 deletion docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,54 @@
},
"summary": "Returns the logs for a given job."
}
},
"/shutdown": {
"delete": {
"consumes": [
"application/json"
],
"description": "All running jobs will be stopped and no more jobs will be scheduled. Kubernetes will call this endpoint before stopping the pod (PreStop hook).",
"operationId": "shutdown",
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "Request successful. All jobs were stopped.",
"examples": {
"application/json": {
"message": "All jobs stopped."
}
},
"schema": {
"properties": {
"message": {
"type": "string"
}
},
"type": "object"
}
},
"500": {
"description": "Request failed. Something went wrong while stopping the jobs.",
"examples": {
"application/json": {
"message": "Could not stop jobs cdcf48b1-c2f3-4693-8230-b066e088444c"
}
},
"schema": {
"properties": {
"message": {
"type": "string"
}
},
"type": "object"
}
}
},
"summary": "Stop reana-job-controller"
}
}
},
"swagger": "2.0"
}
}
10 changes: 7 additions & 3 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ def should_process_job(self, job_pod) -> bool:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Pod.md)
"""
remaining_jobs = self._get_remaining_jobs(
statuses_to_skip=[JobStatus.finished.name, JobStatus.failed.name]
statuses_to_skip=[
JobStatus.finished.name,
JobStatus.failed.name,
JobStatus.stopped.name,
]
)
backend_job_id = self.get_backend_job_id(job_pod)
is_job_in_remaining_jobs = backend_job_id in remaining_jobs
Expand Down Expand Up @@ -291,7 +295,7 @@ def watch_jobs(self, job_db, app):
:param job_db: Dictionary which contains all current jobs.
"""
ignore_hold_codes = [35, 16]
statuses_to_skip = ["finished", "failed"]
statuses_to_skip = ["finished", "failed", "stopped"]
while True:
try:
logging.info("Starting a new stream request to watch Condor Jobs")
Expand Down Expand Up @@ -422,7 +426,7 @@ def watch_jobs(self, job_db, app=None):
banner_timeout=SLURM_SSH_BANNER_TIMEOUT,
auth_timeout=SLURM_SSH_AUTH_TIMEOUT,
)
statuses_to_skip = ["finished", "failed"]
statuses_to_skip = ["finished", "failed", "stopped"]
while True:
logging.debug("Starting a new stream request to watch Jobs")
try:
Expand Down
2 changes: 2 additions & 0 deletions reana_job_controller/kubernetes_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ def execute(self):
"initContainers": [],
"volumes": [],
"restartPolicy": "Never",
# No need to wait a long time for jobs to gracefully terminate
"terminationGracePeriodSeconds": 5,
"enableServiceLinks": False,
},
},
Expand Down
140 changes: 140 additions & 0 deletions reana_job_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import copy
import json
import logging
import threading

from flask import Blueprint, current_app, jsonify, request
from sqlalchemy.exc import OperationalError
Expand All @@ -19,6 +20,9 @@
REANAKubernetesWrongMemoryFormat,
)

from reana_db.models import JobStatus


from reana_job_controller.errors import ComputingBackendSubmissionError
from reana_job_controller.job_db import (
JOB_DB,
Expand All @@ -28,16 +32,62 @@
retrieve_backend_job_id,
retrieve_job,
retrieve_job_logs,
store_job_logs,
update_job_status,
)
from reana_job_controller.schemas import Job, JobRequest
from reana_job_controller.utils import update_workflow_logs
from reana_job_controller import config


blueprint = Blueprint("jobs", __name__)

job_request_schema = JobRequest()
job_schema = Job()


class JobCreationCondition:
"""Mechanism used to synchronize the creation of jobs.
This is used to make sure no thread is able to create new jobs during or after
the shutdown procedure, as otherwise some jobs might not be correctly stopped and
cleaned up. Jobs can still be created in parallel. This works similarly to a RW-lock.
"""

def __init__(self):
"""Initialise a new JobCreationCondition."""
self.condition = threading.Condition()
self.creation_permitted = True
self.ongoing_creations = 0
"""Keep track of the number of ongoing job creations"""

def start_creation(self) -> bool:
"""Check if a new job can be created."""
with self.condition:
if not self.creation_permitted:
return False
self.ongoing_creations += 1
return True

def stop_creation(self):
"""Notify that the creation of the job has finished."""
with self.condition:
self.ongoing_creations -= 1
if self.ongoing_creations == 0:
self.condition.notify_all()

def disable_creation(self):
"""Do not permit to create any new jobs."""
with self.condition:
# wait untill all job creations are finished
while self.ongoing_creations != 0:
self.condition.wait()
self.creation_permitted = False


job_creation_condition = JobCreationCondition()


@blueprint.route("/job_cache", methods=["GET"])
def check_if_cached():
r"""Check if job is cached.
Expand Down Expand Up @@ -227,6 +277,10 @@ def create_job(): # noqa
return jsonify({"message": e.message}), 403
except REANAKubernetesWrongMemoryFormat as e:
return jsonify({"message": e.message}), 400

if not job_creation_condition.start_creation():
return jsonify({"message": "Cannot create new jobs, shutting down"}), 400

try:
backend_jod_id = job_obj.execute()
except OperationalError as e:
Expand All @@ -237,6 +291,9 @@ def create_job(): # noqa
msg = f"Job submission failed. \n{e}"
logging.error(msg, exc_info=True)
return jsonify({"message": msg}), 500
finally:
job_creation_condition.stop_creation()

if job_obj:
job = copy.deepcopy(job_request)
job["status"] = "started"
Expand Down Expand Up @@ -419,6 +476,89 @@ def delete_job(job_id): # noqa
return jsonify({"message": "The job {} doesn't exist".format(job_id)}), 404


@blueprint.route("/shutdown", methods=["GET"])
def shutdown():
r"""Stop reana-job-controller.
All running jobs will be stopped and no more jobs will be scheduled.
Kubernetes will call this endpoint before stopping the pod (PreStop hook).
---
delete:
summary: Stop reana-job-controller
description: >-
All running jobs will be stopped and no more jobs will be scheduled.
Kubernetes will call this endpoint before stopping the pod (PreStop hook).
operationId: shutdown
consumes:
- application/json
produces:
- application/json
responses:
200:
description: >-
Request successful. All jobs were stopped.
schema:
type: object
properties:
message:
type: string
examples:
application/json:
{"message": "All jobs stopped."}
500:
description: >-
Request failed. Something went wrong while stopping the jobs.
schema:
type: object
properties:
message:
type: string
examples:
application/json:
{"message": "Could not stop jobs cdcf48b1-c2f3-4693-8230-b066e088444c"}
"""
logging.info("Starting shutdown")
job_creation_condition.disable_creation()

# Now no more jobs can be scheduled, let's stop all of the others.

jobs = retrieve_all_jobs()
failed_to_stop = []

# jobs is a list of dicts, where each dict has a single entry.
# the key of the dict is the job ID, the value contains the job details.
for job_dict in jobs:
for job_id, job in job_dict.items():
if job["status"] in ("finished", "failed", "stopped"):
continue

backend_job_id = retrieve_backend_job_id(job_id)
# FIXME: ideally we would not be accessing the database manually here
# to get the compute backend and the workspace, but this can wait for a general
# refactor of the "in-memory" database
compute_backend = JOB_DB[job_id]["compute_backend"]
workspace = JOB_DB[job_id]["obj"].workflow_workspace
job_manager_cls = config.COMPUTE_BACKENDS[compute_backend]()
logging.info(f"Stopping job {job_id} ({backend_job_id})")
try:
logs = job_manager_cls.get_logs(backend_job_id, workspace=workspace)
store_job_logs(job_id, logs)
job_manager_cls.stop(backend_job_id)
update_job_status(job_id, JobStatus.stopped.name)
# FIXME: ideally also here we would not access the database directly
JOB_DB[job_id]["deleted"] = True
except Exception:
logging.exception(f"Could not stop job {job_id} ({backend_job_id})")
failed_to_stop.append((job_id))

if failed_to_stop:
return (
jsonify({"message": "Could not stop jobs " + ", ".join(failed_to_stop)}),
500,
)
return jsonify({"message": "All jobs stopped."}), 200


@blueprint.route("/apispec", methods=["GET"])
def get_openapi_spec():
"""Get OpenAPI Spec."""
Expand Down

0 comments on commit 866675b

Please sign in to comment.