Skip to content

Commit

Permalink
7: Implement cancellations and persistency of jobs. Also add in robus…
Browse files Browse the repository at this point in the history
…tness to only accept updates from the Celery task that is successfully submitted.
  • Loading branch information
lfse-slafleur committed Mar 12, 2024
1 parent a194cbc commit c7e9c94
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 62 deletions.
13 changes: 3 additions & 10 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ black==22.1.0
# via orchestrator (pyproject.toml)
build==1.0.3
# via orchestrator (pyproject.toml)
celery[sqlalchemy]==5.3.6
celery==5.3.6
# via
# -c requirements.txt
# omotes-sdk-python
Expand All @@ -49,12 +49,6 @@ click-repl==0.3.0
# via
# -c requirements.txt
# celery
colorama==0.4.6
# via
# -c requirements.txt
# build
# click
# pytest
coverage[toml]==7.3.2
# via pytest-cov
dataclass-binder==0.3.4
Expand Down Expand Up @@ -130,7 +124,7 @@ protobuf==4.25.2
# via
# -c requirements.txt
# omotes-sdk-protocol
psycopg2==2.9.9
psycopg2-binary==2.9.9
# via
# -c requirements.txt
# orchestrator (pyproject.toml)
Expand Down Expand Up @@ -164,10 +158,9 @@ six==1.16.0
# python-dateutil
snowballstemmer==2.2.0
# via pydocstyle
sqlalchemy==2.0.23
sqlalchemy==2.0.28
# via
# -c requirements.txt
# celery
# orchestrator (pyproject.toml)
tomli==2.0.1
# via black
Expand Down
12 changes: 4 additions & 8 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ amqp==5.2.0
# via kombu
billiard==4.2.0
# via celery
celery[sqlalchemy]==5.3.6
celery==5.3.6
# via
# omotes-sdk-python
# orchestrator (pyproject.toml)
Expand All @@ -28,8 +28,6 @@ click-plugins==1.1.1
# via celery
click-repl==0.3.0
# via celery
colorama==0.4.6
# via click
dataclass-binder==0.3.4
# via orchestrator (pyproject.toml)
greenlet==3.0.1
Expand All @@ -50,18 +48,16 @@ prompt-toolkit==3.0.41
# via click-repl
protobuf==4.25.2
# via omotes-sdk-protocol
psycopg2==2.9.9
psycopg2-binary==2.9.9
# via orchestrator (pyproject.toml)
python-dateutil==2.8.2
# via celery
python-dotenv==1.0.0
# via orchestrator (pyproject.toml)
six==1.16.0
# via python-dateutil
sqlalchemy==2.0.23
# via
# celery
# orchestrator (pyproject.toml)
sqlalchemy==2.0.28
# via orchestrator (pyproject.toml)
typing-extensions==4.8.0
# via sqlalchemy
tzdata==2023.3
Expand Down
3 changes: 2 additions & 1 deletion src/omotes_orchestrator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
import os
from omotes_sdk.internal.common.app_logging import setup_logging, LogLevel

setup_logging(LogLevel.parse(os.environ.get("LOG_LEVEL", "DEBUG")), "omotes_orchestrator")
setup_logging(LogLevel.parse(os.environ.get("LOG_LEVEL_SQL", "WARNING")), "sqlalchemy.engine")
18 changes: 18 additions & 0 deletions src/omotes_orchestrator/celery_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import uuid

from celery import Celery
Expand All @@ -7,6 +8,9 @@
from omotes_orchestrator.config import CeleryConfig


LOGGER = logging.getLogger("omotes_orchestrator")


class CeleryInterface:
"""Connect to the Celery app which orchestrates the workers."""

Expand Down Expand Up @@ -52,5 +56,19 @@ def start_workflow(
(job_id, input_esdl, params_dict),
queue=workflow_type.workflow_type_name,
).delay()
LOGGER.debug(
"Started celery task %s with job id %s celery id %s",
workflow_type.workflow_type_name,
job_id,
started_task.task_id,
)

return started_task.task_id

Check failure on line 66 in src/omotes_orchestrator/celery_interface.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Returning Any from function declared to return "str" [no-any-return]

def cancel_workflow(self, celery_id: str) -> None:
"""Cancel a running workflow.
:param celery_id: The task id Celery associated the workflow with.
"""
self.app.control.revoke(task_id=celery_id, terminate=True, signal="SIGTERM")
LOGGER.debug("Revoked job with celery id %s", celery_id)
6 changes: 4 additions & 2 deletions src/omotes_orchestrator/db_models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


class JobStatus(Enum):
REGISTERED = "registered"
SUBMITTED = "submitted"
RUNNING = "running"

Expand All @@ -20,9 +21,10 @@ class JobDB(Base):
__tablename__ = "job"

job_id: uuid.UUID = db.Column(UUID(as_uuid=True), primary_key=True)

Check failure on line 23 in src/omotes_orchestrator/db_models/job.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Incompatible types in assignment (expression has type "Column[UUID]", variable has type "UUID") [assignment]
celery_id: str = db.Column(db.String)
celery_id: str = db.Column(db.String, nullable=True)

Check failure on line 24 in src/omotes_orchestrator/db_models/job.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Incompatible types in assignment (expression has type "Column[str]", variable has type "str") [assignment]
workflow_type: str = db.Column(db.String)

Check failure on line 25 in src/omotes_orchestrator/db_models/job.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Incompatible types in assignment (expression has type "Column[str]", variable has type "str") [assignment]
status: JobStatus = db.Column(db.Enum(JobStatus), nullable=False)

Check failure on line 26 in src/omotes_orchestrator/db_models/job.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Incompatible types in assignment (expression has type "Column[<nothing>]", variable has type "JobStatus") [assignment]
registered_at: datetime = db.Column(db.DateTime(timezone=True), nullable=False)

Check failure on line 27 in src/omotes_orchestrator/db_models/job.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Incompatible types in assignment (expression has type "Column[datetime]", variable has type "datetime") [assignment]
submitted_at: datetime = db.Column(db.DateTime(timezone=True))

Check failure on line 28 in src/omotes_orchestrator/db_models/job.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Incompatible types in assignment (expression has type "Column[datetime]", variable has type "datetime") [assignment]
running_at: datetime = db.Column(db.DateTime(timezone=True))

Check failure on line 29 in src/omotes_orchestrator/db_models/job.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Incompatible types in assignment (expression has type "Column[datetime]", variable has type "datetime") [assignment]
timeout_after_ms: int = db.Column(db.Integer)
is_cancelled: bool = db.Column(db.Boolean, nullable=False, default=False)
Loading

0 comments on commit c7e9c94

Please sign in to comment.