Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: Implement custom RunScheduler #16

Merged
merged 19 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ recursive-include invenio_jobs *.html
recursive-include invenio_jobs *.js
recursive-include invenio_jobs/translations *.po *.pot *.mo
recursive-include tests *.py
recursive-include invenio_jobs *.py
97 changes: 97 additions & 0 deletions invenio_jobs/alembic/356496a01197_create_invenio_jobs_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#
# This file is part of Invenio.
# Copyright (C) 2016-2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Create invenio-jobs tables."""

import sqlalchemy as sa
import sqlalchemy_utils
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "356496a01197"
down_revision = "371f4cbcb73d"
branch_labels = ()
depends_on = None


def upgrade():
"""Upgrade database."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"job",
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(), nullable=False),
sa.Column("active", sa.Boolean(), nullable=False),
sa.Column("title", sa.String(length=255), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("task", sa.String(length=255), nullable=True),
sa.Column("default_queue", sa.String(length=64), nullable=True),
sa.Column(
"default_args",
sa.JSON()
.with_variant(sqlalchemy_utils.types.json.JSONType(), "mysql")
.with_variant(
postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), "postgresql"
)
.with_variant(sqlalchemy_utils.types.json.JSONType(), "sqlite"),
nullable=True,
),
sa.Column(
"schedule",
sa.JSON()
.with_variant(sqlalchemy_utils.types.json.JSONType(), "mysql")
.with_variant(
postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), "postgresql"
)
.with_variant(sqlalchemy_utils.types.json.JSONType(), "sqlite"),
nullable=True,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_job")),
)
op.create_table(
"run",
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(), nullable=False),
sa.Column("job_id", sqlalchemy_utils.types.uuid.UUIDType(), nullable=True),
sa.Column("started_by_id", sa.Integer(), nullable=True),
sa.Column("started_at", sa.DateTime(), nullable=True),
sa.Column("finished_at", sa.DateTime(), nullable=True),
sa.Column("task_id", sqlalchemy_utils.types.uuid.UUIDType(), nullable=True),
sa.Column("status", sa.CHAR(1), nullable=False),
sa.Column("message", sa.Text(), nullable=True),
sa.Column("title", sa.Text(), nullable=True),
sa.Column(
"args",
sa.JSON()
.with_variant(sqlalchemy_utils.types.json.JSONType(), "mysql")
.with_variant(
postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), "postgresql"
)
.with_variant(sqlalchemy_utils.types.json.JSONType(), "sqlite"),
nullable=True,
),
sa.Column("queue", sa.String(length=64), nullable=False),
sa.ForeignKeyConstraint(["job_id"], ["job.id"], name=op.f("fk_run_job_id_job")),
sa.ForeignKeyConstraint(
["started_by_id"],
["accounts_user.id"],
name=op.f("fk_run_started_by_id_accounts_user"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_run")),
)
# ### end Alembic commands ###


def downgrade():
"""Downgrade database."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("run")
op.drop_table("job")
# ### end Alembic commands ###
27 changes: 27 additions & 0 deletions invenio_jobs/alembic/371f4cbcb73d_create_invenio_jobs_branch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# This file is part of Invenio.
# Copyright (C) 2016-2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Create invenio-jobs branch."""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "371f4cbcb73d"
down_revision = None
branch_labels = ("invenio_jobs",)
depends_on = "dbdbc1b19cf2"


def upgrade():
"""Upgrade database."""
pass


def downgrade():
"""Downgrade database."""
pass
20 changes: 18 additions & 2 deletions invenio_jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

import enum
import uuid
from copy import deepcopy
from datetime import timedelta
from inspect import signature

from celery import current_app as current_celery_app
from celery.schedules import crontab
from invenio_accounts.models import User
from invenio_db import db
from invenio_users_resources.records import UserAggregate
Expand Down Expand Up @@ -47,6 +50,19 @@ def last_run(self):
"""Last run of the job."""
return self.runs.order_by(Run.created.desc()).first()

@property
def parsed_schedule(self):
"""Return schedule parsed as crontab or timedelta."""
if not self.schedule:
return None

schedule = deepcopy(self.schedule)
stype = schedule.pop("type")
if stype == "crontab":
return crontab(**schedule)
elif stype == "interval":
return timedelta(**schedule)


class RunStatusEnum(enum.Enum):
"""Enumeration of a run's possible states."""
Expand All @@ -56,8 +72,8 @@ class RunStatusEnum(enum.Enum):
SUCCESS = "S"
FAILED = "F"
WARNING = "W"
CANCELLING = "P"
CANCELLED = "C"
CANCELLING = "C"
CANCELLED = "X"


class Run(db.Model, Timestamp):
Expand Down
27 changes: 14 additions & 13 deletions invenio_jobs/services/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@
"""Service permissions."""

from invenio_administration.generators import Administration
from invenio_records_permissions.generators import SystemProcess
from invenio_records_permissions.policies import BasePermissionPolicy


class TasksPermissionPolicy(BasePermissionPolicy):
"""Access control configuration for tasks."""

can_search = [Administration()]
can_read = [Administration()]
can_search = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]


class JobPermissionPolicy(BasePermissionPolicy):
"""Access control configuration for jobs."""

can_search = [Administration()]
can_create = [Administration()]
can_read = [Administration()]
can_update = [Administration()]
can_delete = [Administration()]
can_search = [Administration(), SystemProcess()]
can_create = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_update = [Administration(), SystemProcess()]
can_delete = [Administration(), SystemProcess()]


class RunPermissionPolicy(BasePermissionPolicy):
Expand All @@ -35,9 +36,9 @@ class RunPermissionPolicy(BasePermissionPolicy):
Later the runs may be done by librarians.
"""

can_search = [Administration()]
can_create = [Administration()]
can_read = [Administration()]
can_update = [Administration()]
can_delete = [Administration()]
can_stop = [Administration()]
can_search = [Administration(), SystemProcess()]
can_create = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_update = [Administration(), SystemProcess()]
can_delete = [Administration(), SystemProcess()]
can_stop = [Administration(), SystemProcess()]
115 changes: 115 additions & 0 deletions invenio_jobs/services/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Custom Celery RunScheduler."""

import traceback
import uuid
from typing import Any

from celery.beat import ScheduleEntry, Scheduler, logger
from invenio_db import db

from invenio_jobs.models import Job, Run, Task
from invenio_jobs.tasks import execute_run


class JobEntry(ScheduleEntry):
"""Entry for celery beat."""

job = None

def __init__(self, job, *args, **kwargs):
"""Initialise entry."""
self.job = job
super().__init__(*args, **kwargs)

@classmethod
def from_job(cls, job):
"""Create JobEntry from job."""
return cls(
job=job,
name=job.title,
schedule=job.parsed_schedule,
kwargs={"kwargs": job.default_args},
task=execute_run.name,
options={"queue": job.default_queue},
last_run_at=(job.last_run and job.last_run.created),
)


class RunScheduler(Scheduler):
"""Custom beat scheduler for runs."""

Entry = JobEntry
entries = {}

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the database scheduler."""
super().__init__(*args, **kwargs)

@property
def schedule(self):
"""Get currently scheduled entries."""
return self.entries

# Celery internal override
def setup_schedule(self):
"""Setup schedule."""
self.sync()

# Celery internal override
def reserve(self, entry):
"""Update entry to next run execution time."""
new_entry = self.schedule[entry.job.id] = next(entry)
alejandromumo marked this conversation as resolved.
Show resolved Hide resolved
return new_entry

# Celery internal override
def apply_entry(self, entry, producer=None):
"""Create and apply a JobEntry."""
with self.app.flask_app.app_context():
logger.info("Scheduler: Sending due task %s (%s)", entry.name, entry.task)
alejandromumo marked this conversation as resolved.
Show resolved Hide resolved
try:
# TODO Only create and send task if there is no "stale" run (status running, starttime > hour, Run pending for > 1 hr)
run = self.create_run(entry)
entry.options["task_id"] = str(run.task_id)
entry.args = (str(run.id),)
result = self.apply_async(entry, producer=producer, advance=False)
except Exception as exc:
logger.error(
"Message Error: %s\n%s",
exc,
traceback.format_stack(),
exc_info=True,
)
else:
if result and hasattr(result, "id"):
logger.debug("%s sent. id->%s", entry.task, result.id)
else:
logger.debug("%s sent.", entry.task)

# Celery internal override
def sync(self):
"""Sync Jobs from db to the scheduler."""
# TODO Should we also have a cleaup task for runs? "stale" run (status running, starttime > hour, Run pending for > 1 hr)
alejandromumo marked this conversation as resolved.
Show resolved Hide resolved
with self.app.flask_app.app_context():
jobs = Job.query.filter(Job.active == True).all()
self.entries = {} # because some jobs might be deactivated
alejandromumo marked this conversation as resolved.
Show resolved Hide resolved
for job in jobs:
self.entries[job.id] = JobEntry.from_job(job)

def create_run(self, entry):
"""Create run from a JobEntry."""
job = Job.query.filter_by(id=entry.job.id).one()
run = Run(
job=job,
args=job.default_args,
queue=job.default_queue,
task_id=uuid.uuid4(),
)
db.session.commit()
return run
Loading