Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

70: Forward job_reference from JobSubmission if available to Celery t… #71

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/omotes_orchestrator/celery_interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import uuid
from typing import Optional

from celery import Celery
from celery.result import AsyncResult
Expand Down Expand Up @@ -41,20 +42,26 @@ def stop(self) -> None:
self.app.close()

def start_workflow(
self, workflow_type: WorkflowType, job_id: uuid.UUID, input_esdl: str, params_dict: dict
self,
workflow_type: WorkflowType,
job_id: uuid.UUID,
job_reference: Optional[str],
input_esdl: str,
params_dict: dict,
) -> str:
"""Start a new workflow.

:param workflow_type: Type of workflow to start. Currently, this translates directly to
a Celery task with the same name.
:param job_id: The OMOTES ID of the job.
:param job_reference: The reference to the job supplied by the user.
:param input_esdl: The ESDL to perform the task on.
:param params_dict: The additional, non-ESDL, job parameters.
:return: Celery task id.
"""
started_task: AsyncResult = self.app.signature(
workflow_type.workflow_type_name,
(job_id, input_esdl, params_dict),
(job_id, job_reference, input_esdl, params_dict),
queue=workflow_type.workflow_type_name,
).delay()
LOGGER.debug(
Expand Down
21 changes: 17 additions & 4 deletions src/omotes_orchestrator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,9 @@
job_uuid = uuid.UUID(job_submission.uuid)
if workflow_type is None:
logger.warning(
"Received a new job (id %s) with unknown workflow type %s. Ignoring job.",
"Received a new job (id %s, reference %s) with unknown workflow type %s. Ignoring job.",
job_submission.uuid,
job_submission.job_reference,

Check failure on line 281 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: "JobSubmission" has no attribute "job_reference" [attr-defined]
job_submission.workflow_type,
)
self.omotes_sdk_if.send_job_result_by_job_id(
Expand All @@ -295,7 +296,10 @@
job = Job(job_uuid, workflow_type)

logger.info(
"Received new job %s for workflow type %s", job.id, job_submission.workflow_type
"Received new job %s with reference %s for workflow type %s",
job.id,
job_submission.job_reference,

Check failure on line 301 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: "JobSubmission" has no attribute "job_reference" [attr-defined]
job_submission.workflow_type,
)
submitted_job_id = uuid.UUID(job_submission.uuid)

Expand All @@ -309,7 +313,7 @@
"New job %s was already registered previously. Will be submitted %s", job.id, submit
)
else:
logger.debug("New job %s was not yet registered. Registering and submitting.")
logger.debug("New job %s was not yet registered. Registering and submitting.", job.id)

if not job_submission.HasField("timeout_ms"):
timeout_after_ms = None
Expand All @@ -327,16 +331,25 @@
submit = True

if submit:
job_reference = None
if job_submission.HasField("job_reference"):

Check failure on line 335 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Argument 1 to "HasField" of "JobSubmission" has incompatible type "Literal['job_reference']"; expected "Literal['_timeout_ms', b'_timeout_ms', 'params_dict', b'params_dict', 'timeout_ms', b'timeout_ms']" [arg-type]
job_reference = job_submission.job_reference

Check failure on line 336 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: "JobSubmission" has no attribute "job_reference" [attr-defined]

self._init_barriers.ensure_barrier(submitted_job_id)
celery_task_id = self.celery_if.start_workflow(
job.workflow_type,
job.id,
job_reference,
job_submission.esdl,
json_format.MessageToDict(job_submission.params_dict),
)

self.postgresql_if.set_job_submitted(job.id, celery_task_id)
logger.debug("New job %s has been submitted.", job.id)
logger.debug(
"New job %s with reference %s has been submitted.",
job.id,
job_submission.job_reference,

Check failure on line 351 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: "JobSubmission" has no attribute "job_reference" [attr-defined]
)
self._init_barriers.set_barrier(submitted_job_id)

def job_cancellation_handler(self, job_cancellation: JobCancel) -> None:
Expand Down
Loading