Skip to content

Commit

Permalink
global: implement Runs backend
Browse files Browse the repository at this point in the history
  • Loading branch information
slint committed May 27, 2024
1 parent 13d1af1 commit c6efffb
Show file tree
Hide file tree
Showing 14 changed files with 439 additions and 65 deletions.
1 change: 1 addition & 0 deletions invenio_jobs/administration/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# details.

"""Invenio administration Runs view module."""

from invenio_administration.views.base import AdminResourceListView
from invenio_i18n import lazy_gettext as _

Expand Down
15 changes: 11 additions & 4 deletions invenio_jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from celery import current_app as current_celery_app
from invenio_accounts.models import User
from invenio_db import db
from invenio_users_resources.records import UserAggregate
from sqlalchemy.dialects import postgresql
from sqlalchemy_utils import Timestamp
from sqlalchemy_utils.types import ChoiceType, JSONType, UUIDType
Expand Down Expand Up @@ -67,11 +68,17 @@ class Run(db.Model, Timestamp):
job = db.relationship(Job, backref=db.backref("runs", lazy="dynamic"))

started_by_id = db.Column(db.Integer, db.ForeignKey(User.id), nullable=True)
started_by = db.relationship(User)
_started_by = db.relationship(User)

@property
def started_by(self):
if self._started_by:
return UserAggregate.from_model(self._started_by)

started_at = db.Column(db.DateTime, nullable=True)
finished_at = db.Column(db.DateTime, nullable=False)
finished_at = db.Column(db.DateTime, nullable=True)

task_id = db.Column(UUIDType, nullable=True)
status = db.Column(
ChoiceType(RunStatusEnum, impl=db.String(1)),
nullable=False,
Expand All @@ -80,9 +87,9 @@ class Run(db.Model, Timestamp):

message = db.Column(db.Text, nullable=True)

task_id = db.Column(UUIDType, nullable=True)
title = db.Column(db.Text, nullable=True)
args = db.Column(JSON, default=lambda: dict(), nullable=True)
queue = db.Column(db.String(64), nullable=True)
queue = db.Column(db.String(64), nullable=False)


class Task:
Expand Down
36 changes: 24 additions & 12 deletions invenio_jobs/resources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from invenio_records_resources.resources.records.args import SearchRequestArgsSchema
from invenio_records_resources.services.base.config import ConfiguratorMixin

from ..services.errors import JobNotFoundError
from invenio_jobs.models import RunStatusEnum

from ..services import errors

response_handlers = {
**ResourceConfig.response_handlers,
Expand All @@ -28,6 +30,18 @@
"application/json"
],
}
error_handlers = {
**ErrorHandlersMixin.error_handlers,
errors.JobNotFoundError: create_error_handler(
lambda e: HTTPJSONException(code=404, description=e.description)
),
errors.RunNotFoundError: create_error_handler(
lambda e: HTTPJSONException(code=404, description=e.description)
),
errors.RunStatusChangeError: create_error_handler(
lambda e: HTTPJSONException(code=400, description=e.description)
),
}


class TasksResourceConfig(ResourceConfig, ConfiguratorMixin):
Expand Down Expand Up @@ -71,13 +85,13 @@ class JobsResourceConfig(ResourceConfig, ConfiguratorMixin):

# Response handling
response_handlers = response_handlers
error_handlers = error_handlers

error_handlers = {
**ErrorHandlersMixin.error_handlers,
JobNotFoundError: create_error_handler(
lambda e: HTTPJSONException(code=404, description=e.description)
),
}

class RunsSearchRequestArgsSchema(SearchRequestArgsSchema):
"""Runs search request parameters."""

status = ma.fields.Enum(RunStatusEnum)


class RunsResourceConfig(ResourceConfig, ConfiguratorMixin):
Expand All @@ -95,16 +109,14 @@ class RunsResourceConfig(ResourceConfig, ConfiguratorMixin):
}

# Request handling
request_read_args = {}
request_view_args = {
"job_id": ma.fields.UUID(),
"run_id": ma.fields.UUID(),
}
request_search_args = RunsSearchRequestArgsSchema
request_body_parsers = request_body_parsers

# Response handling
response_handlers = response_handlers

error_handlers = {
**ErrorHandlersMixin.error_handlers,
# TODO: Add custom error handlers here
}
error_handlers = error_handlers
21 changes: 15 additions & 6 deletions invenio_jobs/resources/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def create_url_rules(self):
routes = self.config.routes
url_rules = [
route("GET", routes["list"], self.search),
route("POST", routes["item"], self.create),
route("POST", routes["list"], self.create),
route("DELETE", routes["item"], self.delete),
route("GET", routes["logs_list"], self.logs),
]
Expand All @@ -154,23 +154,27 @@ def create_url_rules(self):
# Primary Interface
#
@request_search_args
@request_view_args
@response_handler(many=True)
def search(self):
"""Perform a search."""
identity = g.identity
hits = self.service.search(
identity=identity,
job_id=resource_requestctx.view_args["job_id"],
params=resource_requestctx.args,
)
return hits.to_dict(), 200

@request_data
@request_view_args
@response_handler()
def create(self):
"""Create an item."""
item = self.service.create(
g.identity,
resource_requestctx.data or {},
job_id=resource_requestctx.view_args["job_id"],
data=resource_requestctx.data or {},
)
return item.to_dict(), 201

Expand All @@ -181,20 +185,23 @@ def logs(self):
identity = g.identity
hits = self.service.search(
identity=identity,
job_id=resource_requestctx.view_args["job_id"],
run_id=resource_requestctx.view_args["run_id"],
params=resource_requestctx.args,
)
return hits.to_dict(), 200

@request_headers
@request_view_args
@request_data
@request_view_args
@response_handler()
def update(self):
"""Update an item."""
item = self.service.update(
g.identity,
resource_requestctx.view_args["id"],
resource_requestctx.data,
job_id=resource_requestctx.view_args["job_id"],
run_id=resource_requestctx.view_args["run_id"],
data=resource_requestctx.data,
)
return item.to_dict(), 200

Expand All @@ -204,6 +211,8 @@ def delete(self):
"""Delete an item."""
self.service.delete(
g.identity,
resource_requestctx.view_args["id"],
job_id=resource_requestctx.view_args["job_id"],
run_id=resource_requestctx.view_args["run_id"],
data=resource_requestctx.view_args["id"],
)
return "", 204
22 changes: 15 additions & 7 deletions invenio_jobs/services/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

from ..models import Job, Run, Task
from . import results
from .links import JobLink
from .links import JobLink, RunLink
from .permissions import JobPermissionPolicy, RunPermissionPolicy, TasksPermissionPolicy
from .schema import JobSchema, TaskSchema
from .schema import JobSchema, RunSchema, TaskSchema


class TasksSearchOptions(SearchOptionsBase):
Expand Down Expand Up @@ -108,7 +108,15 @@ class JobsServiceConfig(ServiceConfig, ConfiguratorMixin):
class RunSearchOptions(SearchOptionsBase):
"""Run search options."""

# TODO: See what we need to override
sort_default = "created"
sort_direction_default = "desc"
sort_direction_options = {
"asc": dict(title=_("Ascending"), fn=asc),
"desc": dict(title=_("Descending"), fn=desc),
}
sort_options = {"created": dict(title=_("Created"), fields=["created"])}

pagination_options = {"default_results_per_page": 25}


class RunsServiceConfig(ServiceConfig, ConfiguratorMixin):
Expand All @@ -118,7 +126,7 @@ class RunsServiceConfig(ServiceConfig, ConfiguratorMixin):

record_cls = Run
search = RunSearchOptions
schema = JobSchema
schema = RunSchema

permission_policy_cls = FromConfig(
"JOBS_RUNS_PERMISSION_POLICY",
Expand All @@ -129,9 +137,9 @@ class RunsServiceConfig(ServiceConfig, ConfiguratorMixin):
result_list_cls = results.List

links_item = {
"self": JobLink("{+api}/jobs/{job_id}/runs/{run_id}"),
"stop": JobLink("{+api}/jobs/{job_id}/runs/{run_id}/actions/stop"),
"logs": JobLink("{+api}/jobs/{job_id}/runs/{run_id}/logs"),
"self": RunLink("{+api}/jobs/{job_id}/runs/{id}"),
"stop": RunLink("{+api}/jobs/{job_id}/runs/{id}/actions/stop"),
"logs": RunLink("{+api}/jobs/{job_id}/runs/{id}/logs"),
}

links_search = pagination_links("{+api}/jobs/{job_id}{?args*}")
25 changes: 25 additions & 0 deletions invenio_jobs/services/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,28 @@ def __init__(self, id):
super().__init__(
description=_("Job with ID {id} does not exist.").format(id=id)
)


class RunNotFoundError(JobsError):
"""Run not found error."""

def __init__(self, id, job_id=None):
"""Initialise error."""
description = _("Run with ID {id} does not exist.")
if job_id:
description = _("Run with ID {id} for job {job_id} does not exist.")
super().__init__(description=description.format(id=id, job_id=job_id))


class RunStatusChangeError(JobsError):
"""Run status change error."""

def __init__(self, run, new_status):
"""Initialise error."""
self.run = run
self.new_status = new_status
super().__init__(
description=_("You cannot change run status from {old} to {new}.").format(
old=run.status, new=new_status
)
)
7 changes: 6 additions & 1 deletion invenio_jobs/services/links.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ class RunLink(Link):
@staticmethod
def vars(record, vars):
"""Variables for the URI template."""
vars.update({"id": str(record.id), "job_id": str(record.job_id)})
vars.update(
{
"id": str(record.id),
"job_id": str(record.job_id),
}
)


def pagination_links(tpl):
Expand Down
1 change: 1 addition & 0 deletions invenio_jobs/services/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ class RunPermissionPolicy(BasePermissionPolicy):
can_read = [Administration()]
can_update = [Administration()]
can_delete = [Administration()]
can_stop = [Administration()]
37 changes: 36 additions & 1 deletion invenio_jobs/services/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from collections.abc import Iterable, Sized

from flask_sqlalchemy import Pagination
from invenio_records_resources.services.records.results import RecordItem, RecordList
from invenio_records_resources.services.records.results import (
ExpandableField,
RecordItem,
RecordList,
)


class Item(RecordItem):
Expand Down Expand Up @@ -71,3 +75,34 @@ def hits(self):
link.expand(self._identity, hit, projection)

yield projection


class ModelExpandableField(ExpandableField):
"""Expandable entity resolver field.
It will use the Entity resolver registry to retrieve the service to
use to fetch records and the fields to return when serializing
the referenced record.
"""

entity_proxy = None

def ghost_record(self, value):
"""Return ghost representation of not resolved value."""
return self.entity_proxy.ghost_record(value)

def system_record(self):
"""Return the representation of a system user."""
return self.entity_proxy.system_record()

def get_value_service(self, value):
"""Return the value and the service via entity resolvers."""
self.entity_proxy = ResolverRegistry.resolve_entity_proxy(value)
v = self.entity_proxy._parse_ref_dict_id()
_resolver = self.entity_proxy.get_resolver()
service = _resolver.get_service()
return v, service

def pick(self, identity, resolved_rec):
"""Pick fields defined in the entity resolver."""
return self.entity_proxy.pick_resolved_fields(identity, resolved_rec)
Loading

0 comments on commit c6efffb

Please sign in to comment.