Skip to content

Commit

Permalink
Merge pull request #44 from Project-OMOTES/43-fix-missing-init_barrie…
Browse files Browse the repository at this point in the history
…rs-when-cancel-or-result-is-received-for-a-job-but-the-job-was-already-done-and-update-sdk

43: Fix issues where a cancel or result was received but a job was al…
  • Loading branch information
lfse-slafleur authored Jun 17, 2024
2 parents d1f184b + 2428fc4 commit 37d1a11
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 41 deletions.
2 changes: 1 addition & 1 deletion computation-engine-at-orchestrator
15 changes: 1 addition & 14 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ mako==1.3.2
# via
# -c requirements.txt
# alembic
# pdoc3
markdown==3.5.2
# via
# -c requirements.txt
# pdoc3
markupsafe==2.1.5
# via
# -c requirements.txt
Expand All @@ -116,7 +111,7 @@ omotes-sdk-protocol==0.0.8
# via
# -c requirements.txt
# omotes-sdk-python
omotes-sdk-python==0.0.12
omotes-sdk-python==0.0.16
# via
# -c requirements.txt
# orchestrator (pyproject.toml)
Expand All @@ -131,10 +126,6 @@ pamqp==3.2.1
# aiormq
pathspec==0.11.2
# via black
pdoc3==0.10.0
# via
# -c requirements.txt
# streamcapture
platformdirs==4.1.0
# via black
pluggy==1.3.0
Expand Down Expand Up @@ -186,10 +177,6 @@ sqlalchemy[mypy]==2.0.28
# -c requirements.txt
# alembic
# orchestrator (pyproject.toml)
streamcapture==1.2.2
# via
# -c requirements.txt
# omotes-sdk-python
tomli==2.0.1
# via black
types-protobuf==4.24.0.20240302
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
"sqlalchemy ~= 2.0.27",
"psycopg2-binary ~= 2.9.9",
"celery ~= 5.3.6",
"omotes-sdk-python ~= 0.0.12",
"omotes-sdk-python ~= 0.0.16",
"alembic ~= 1.13.1",
]

Expand Down Expand Up @@ -68,7 +68,7 @@ enabled = true

[tool.pytest.ini_options]
addopts = """--cov=omotes_orchestrator --cov-report html --cov-report term-missing \
--cov-fail-under 58"""
--cov-fail-under 57"""

[tool.coverage.run]
source = ["src"]
Expand Down
15 changes: 2 additions & 13 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,17 @@ idna==3.6
kombu==5.3.4
# via celery
mako==1.3.2
# via
# alembic
# pdoc3
markdown==3.5.2
# via pdoc3
# via alembic
markupsafe==2.1.5
# via mako
multidict==6.0.5
# via yarl
omotes-sdk-protocol==0.0.8
# via omotes-sdk-python
omotes-sdk-python==0.0.12
omotes-sdk-python==0.0.16
# via orchestrator (pyproject.toml)
pamqp==3.2.1
# via aiormq
pdoc3==0.10.0
# via streamcapture
prompt-toolkit==3.0.41
# via click-repl
protobuf==4.25.2
Expand All @@ -72,8 +66,6 @@ sqlalchemy==2.0.28
# via
# alembic
# orchestrator (pyproject.toml)
streamcapture==1.2.2
# via omotes-sdk-python
typing-extensions==4.8.0
# via
# alembic
Expand All @@ -91,6 +83,3 @@ yarl==1.9.4
# via
# aio-pika
# aiormq

# The following packages are considered to be unsafe in a requirements file:
# setuptools
32 changes: 21 additions & 11 deletions src/omotes_orchestrator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,26 @@ def job_cancellation_handler(self, job_cancellation: JobCancel) -> None:
is cancelled.
If the job is registered in the database but no celery id is persisted, this is logged
as a warning.
as a warning. Rationale: The queue through which cancellations are received is a different
queue from where workers post results. Therefore, this function cannot rely on any ordering
of when a job is submitted, a progress update is received or when a result is received.
In other words, cancellations are only possible when the cancellation is received when the
job is submitted or active. In all other cases, the cancellation is ignored.
:param job_cancellation: Request to cancel a job.
"""
logger.info("Received job cancellation for job %s", job_cancellation.uuid)
job_id = uuid.UUID(job_cancellation.uuid)

self._init_barriers.wait_for_barrier(job_id)
job_db = self.postgresql_if.get_job(job_id)
if job_db and job_db.status == JobStatusDB.REGISTERED:
self._init_barriers.wait_for_barrier(job_id)
job_db = self.postgresql_if.get_job(job_id)

if job_db is None:
logger.warning(
"Received a request to cancel job %s but it was already completed, "
"cancelled or removed.",
"cancelled, removed or was not yet submitted.",
job_cancellation.uuid,
)
elif job_db.celery_id is None:
Expand Down Expand Up @@ -360,22 +366,23 @@ def task_result_received(self, serialized_message: bytes) -> None:
id=uuid.UUID(task_result.job_id),
workflow_type=workflow_type,
)
self._init_barriers.wait_for_barrier(job.id)

job_db = self.postgresql_if.get_job(job.id)
if job_db and job_db.status == JobStatusDB.REGISTERED:
self._init_barriers.wait_for_barrier(job.id)
job_db = self.postgresql_if.get_job(job.id)

# Confirm the job is still relevant.
if job_db is None:
logger.info("Ignoring result as job %s was already cancelled or completed.", job.id)
elif job_db.celery_id != task_result.celery_task_id:
logger.warning(
"Job %s has a result but was is not the celery task that was expected."
"Job %s has a result but it is not from the celery task that was expected."
"Ignoring result. Expected celery task id %s but received celery task id %s",
job.id,
job_db.celery_id,
task_result.celery_task_id,
)

elif task_result.result_type == TaskResult.ResultType.SUCCEEDED:
logger.info(
"Received succeeded result for job %s through task %s",
Expand Down Expand Up @@ -428,8 +435,8 @@ def task_progress_update(self, serialized_message: bytes) -> None:
progress_update = TaskProgressUpdate()
progress_update.ParseFromString(serialized_message)
logger.debug(
"Received progress update for job %s (celery task id %s) to progress %s with message: "
"%s",
"Received progress update for job %s (celery task id %s) to progress %s with "
"message: %s",
progress_update.job_id,
progress_update.celery_task_id,
progress_update.progress,
Expand All @@ -451,7 +458,6 @@ def task_progress_update(self, serialized_message: bytes) -> None:
)

job_db = self.postgresql_if.get_job(job.id)

if job_db and job_db.status == JobStatusDB.REGISTERED:
self._init_barriers.wait_for_barrier(job.id)
job_db = self.postgresql_if.get_job(job.id)
Expand All @@ -467,9 +473,13 @@ def task_progress_update(self, serialized_message: bytes) -> None:
return
elif job_db.celery_id != progress_update.celery_task_id:
logger.warning(
"Job %s has a progress update but was not successfully submitted yet."
"Ignoring progress update and cancelling this task.",
"Job %s has a progress update but it is not from the celery task that was "
"expected. Ignoring result. Expected celery task id %s but received celery "
"task id %s. Cancelling celery task with id %s",
job.id,
job_db.celery_id,
progress_update.celery_task_id,
progress_update.celery_task_id,
)
self.celery_if.cancel_workflow(progress_update.celery_task_id)
return
Expand Down

0 comments on commit 37d1a11

Please sign in to comment.