Skip to content

Commit

Permalink
* Tasks are queued, excecuted and updated as planed
Browse files Browse the repository at this point in the history
* clients would have to ask for an update from time to tim until the
  session is finished (see "local_requests.py")
  • Loading branch information
Tom Gebhardt authored and Tom Gebhardt committed Nov 9, 2023
1 parent 3715c83 commit 4b40d28
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 55 deletions.
7 changes: 5 additions & 2 deletions app/celery/automated_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from .f1_model_persistent_identifier_task import f1_model_persistent_identifier
from .f4_model_metadata_harvestable_task import f4_model_metadata_harvestable
from .csh_f2_persistent_identifier_task import csh_f2_persistent_identifier
from .csh_fair import csh_f1_2_globally_unique_identifier, csh_a1_contains_access_information



__all__ = [
f1_model_persistent_identifier,
f4_model_metadata_harvestable,
csh_f2_persistent_identifier
csh_f1_2_globally_unique_identifier,
csh_a1_contains_access_information
]
154 changes: 154 additions & 0 deletions app/celery/automated_tasks/csh_fair.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import re
import requests

#from typing import Optional

from .csh_helpers import check_route
from app.dependencies.settings import get_settings
from ... import models

from app.celery.celery_app import app

config = get_settings()

def is_doi(identifier):
doi_pattern = r'^10\.\d{4,9}/[-._;()/:A-Z0-9]+$'
# Use the re.match function to check if the string matches the pattern
return bool(re.match(doi_pattern, identifier))



def incoperate_results(task_dict: dict, result: ["success","failed","warning"], test: bool):
print("incoperate results!")
session_id = task_dict["session_id"]
task_id = task_dict["id"]

print(config.celery_key)
status = models.TaskStatusIn(
status=models.TaskStatus(result), force_update=config.celery_key
)

print(f"Task status computed: {result}")
# Needs to send a request for the task to be updated
if test:
print("test is true")
return models.TaskStatus(result)
else:
url = f"http://{config.backend_url}:{config.backend_port}/session/{session_id}/tasks/{task_id}"
print(f"Patching {url}")
requests.patch(
url,
json=status.dict(),
)

# Does not work because celery does not have access to fair_indicators
# routers.update_task(session_id, task_id, status)

# Works, but does not trigger updating of children
# redis_app.json().set(f"session:{session_id}", f".tasks.{task_id}.status", obj=result)



@app.task
def csh_f1_2_globally_unique_identifier(
task_dict: dict, data: dict, test: bool = False
):
print("f1_2_glob")
"""
Representation of celery task to evaluate an assessment.
These celery tasks should be in the format:
```
def assessment_task(task_dict: dict, data: dict) -> None:
session_id = task_dict["session_id"]
task_id = task_dict["id"]
# Code to get the final TaskStatus
...
status = models.TaskStatusIn(status=models.TaskStatus(result), force_update=config.celery_key)
requests.patch(
f"http://localhost:8000/session/{session_id}/tasks/{task_id},
json=status
)
:param task_dict: Task dict representation
:param data: (Meta)Data to evaluate
:return: None
"""


identifier = check_route(data, ["resource", "resource_identifier"])

#could also retrive "type" from data instead of using .startswith

if(is_doi(identifier)):
result = "success"
elif(identifier.startswith("DRKS")):
result = "success"
else:
result = "failed"

incoperate_results(task_dict, result, test)


# @app.task
# def csh_f1_1_persistent_identifier(task_dict: dict, data: dict, test: bool = False):

# """
# Task to test weather an identifier is persistent.
# Since the identifier is either unique for CSH, it is persistent
# """

# result = "success"

# incoperate_results(task_dict, result, test)

# @app.task
# def csh_f2_rich_metadata_provided(task_dict: dict, data: dict, test: bool = False):
# """
# The nature of the CSH with all its mandatory fields implies a success
# """

# result = "success"

# incoperate_results(task_dict, result, test)

# @app.task
# def csh_f3_id_of_do_included(task_dict: dict, data: dict, test: bool = False):
# """
# we are unsure about this indicator. At the moment we consider it as a fail
# """

# result = "success"

# incoperate_results(task_dict, result, test)


# @app.task
# def csh_f4_metadata_indexed(task_dict: dict, data: dict, test: bool = False):
# """
# since the data is send to out tool as a json it clearly is indexed
# """

# result = "success"

# incoperate_results(task_dict, result, test)

@app.task
def csh_a1_contains_access_information(task_dict: dict, data: dict, test: bool = False):
"""
1. check if there is a data sharing plan (study_data_sharing_plan_generally)
2. if yes -> evaluate ‘study_data_sharing_plan_time_frame’ and ‘study_data_sharing_plan_access_criteria’ somehow
"""
general_plan = check_route(data, ["resource","study_design","study_data_sharing_plan","study_data_sharing_plan_description"])
print("INFO - general plan - ", general_plan)

has_plan = general_plan == "Yes, there is a plan to make data available"

if has_plan:
print("TODO: implent a check of the actual data sharing plan")
result = "success"
else:
result = "failed"

incoperate_results(task_dict, result, test)
14 changes: 14 additions & 0 deletions app/celery/automated_tasks/csh_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import json

def check_route(metadata, route_keys):
current_position = json.loads(metadata)

for key in route_keys:
if key in current_position:

current_position = current_position[key]
else:
#if a key is missing return false
return False
#if the route exists return the value
return current_position
13 changes: 11 additions & 2 deletions app/metrics/metrics.csv
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,14 @@
"CA-RDA-R1.3-02MM","Essential","Metadata of model is expressed in compliance with a machine-understandable community standard","This indicator requires that the model metadata follows a community standard that has a machine-understandable expression","This indicator can be evaluated by verifying that the community standard used f or the metadata has a machine-understandable expression"
"CA-RDA-R1.3-03MA","Important","Metadata of archive is expressed in compliance with a machine-understandable cross-community standard","This indicator requires that the archive metadata follows a cross-community standard that has a machine-understandable expression","This indicator can be evaluated by verifying that the cross-community standard used f or the metadata has a machine-understandable expression"
"CA-RDA-R1.3-03MM","Important","Metadata of model is expressed in compliance with a machine-understandable cross-community standard","This indicator requires that the model metadata follows a cross-community standard that has a machine-understandable expression","This indicator can be evaluated by verifying that the cross-community standard used f or the metadata has a machine-understandable expression"
"CSH-RDA-F1-01","Important","Is a unique identifier assigned to the study","This indicator is the inital test indicator for evaluating elemts from the CSH","And some more details","??"
"CSH-RDA-F2-01","Essential","Something else","And some more details","??"
"CSH-RDA-F1-01M","Essential","Metadata is identified by a persistent identifier","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-F1-02M","Essential","Metadata is identified by a globally unique identifier","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-F2-01M","Essential","Rich metadata is provided to allow discovery","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-F3-01M","Essential","Metadata includes the identifier for the data","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-F4-01M","Essential","Metadata is offered in such a way that it can be harvested and indexed","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-A1-01M","Important","Metadata contains information to enable the user to get access to the data","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-A1-02M","Essential","Metadata can be accessed manually","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-A1-03M","Essential","Metadata identifier resolves to a metadata record","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-A1-04M","Essential","Metadata is accessed through standardised protocol","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-A1.1-01M","Essential","Metadata is accessible through a free access protocol","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
"CSH-RDA-A2-01M","Essential","Metadata is guaranteed to remain available after data is no longer available","---Add a description about the indicator (probably adapt from RDA schema)---","Add an explanation on how this indicator is technically evaluated"
6 changes: 0 additions & 6 deletions app/models/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class SessionSubjectIn(BaseModel):

@validator("subject_type", always=True)
def necessary_data_provided(cls, subject_type: str, values: dict):
print(values)
if subject_type is SubjectType.manual:
if (
values.get("has_archive") is None
Expand Down Expand Up @@ -205,9 +204,6 @@ def __init__(self, session: Session) -> None:
if self.user_input.subject_type in [SubjectType.file, SubjectType.url]: #url is currently not supported, thus this step wouldn't be reached for URL support
self.assessed_data = self.retrieve_data(self.user_input.path)
elif self.user_input.subject_type is SubjectType.csh:
print("---")
print(self.user_input)
print("---")
self.assessed_data = self.user_input.metadata
self.create_tasks()

Expand Down Expand Up @@ -484,7 +480,6 @@ def _get_default_task_status(self, indicator: str) -> tuple[TaskStatus, bool]:
return TaskStatus(config.pmr_assessment_status[indicator]), True

if indicator in config.csh_metadata_status:
print("+++++++HUHU++++++")
return TaskStatus(config.csh_metadata_status[indicator]), True

if indicator in config.assessment_dependencies:
Expand Down Expand Up @@ -591,7 +586,6 @@ def start_automated_tasks(self):
if self.user_input.subject_type is not SubjectType.csh:
task.do_evaluate(self.assessed_data.dict())
else:
print("???????????????")
task.do_evaluate(self.assessed_data)

def json(self):
Expand Down
43 changes: 0 additions & 43 deletions app/routers/csh_router.py

This file was deleted.

15 changes: 13 additions & 2 deletions tests/local_request.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import requests
import json
import time

print("TESTING THE SERVER")

# Define the URL of the local server
url = 'http://localhost:8000/session'

# Define the data you want to send in the POST request (as a dictionary)
# metadata that will be used for development
metadata = {
"link": None,
"resource": {
Expand Down Expand Up @@ -346,7 +347,17 @@
# Check the response
if response.status_code == 200:
print("Request was successful.")
print("Response:", response.json()['tasks'])

print("Response:", response.json()['status'])
time.sleep(3)
print("should be 3 seconds delayed")
#get session to look if its finished
while response.json()['status'] != 'finished':
time.sleep(3)
response = requests.get(url + '/' + response.json()['id'], )
print('status: ', response.json()['status'])

print(response.json())
else:
print("Request failed with status code:", response.status_code)
print(response.text)

0 comments on commit 4b40d28

Please sign in to comment.