-
Notifications
You must be signed in to change notification settings - Fork 3
/
api.py
92 lines (67 loc) · 2.62 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import json
import os
from json import JSONDecodeError
from urllib import error, request
import requests
import uvicorn
from fastapi import Body, FastAPI, Response
from requests import RequestException
from structlog import get_logger
from app.validators.questionnaire_validator import QuestionnaireValidator
app = FastAPI()
@app.get("/status")
async def status():
return Response(status_code=200)
logger = get_logger()
AJV_HOST = os.getenv("AJV_HOST", "localhost")
AJV_VALIDATOR_URL = f"http://{AJV_HOST}:5002/validate"
@app.post("/validate")
async def validate_schema_request_body(payload=Body(None)):
logger.info("Validating schema")
return await validate_schema(payload)
@app.get("/validate")
async def validate_schema_from_url(url=None):
if url:
logger.info("Validating schema from URL", url=url)
try:
with request.urlopen(url) as opened_url:
return await validate_schema(data=opened_url.read().decode())
except error.URLError:
return Response(
status_code=404, content=f"Could not load schema at URL [{url}]"
)
async def validate_schema(data):
json_to_validate = None
if data:
if isinstance(data, str):
try:
json_to_validate = json.loads(data)
except JSONDecodeError:
logger.info("Could not parse JSON", status=400)
return Response(status_code=400, content="Could not parse JSON")
elif isinstance(data, dict):
json_to_validate = data
response = {}
try:
ajv_response = requests.post(
AJV_VALIDATOR_URL, json=json_to_validate, timeout=10
)
if ajv_response_dict := ajv_response.json():
response["errors"] = ajv_response_dict["errors"]
logger.info("Schema validator returned errors", status=400)
return response, 400
except RequestException:
logger.info("AJV Schema validator service unavailable")
return json.dumps(obj={}, error="AJV Schema validator service unavailable")
validator = QuestionnaireValidator(json_to_validate)
validator.validate()
if validator.errors:
response["errors"] = validator.errors
logger.info("Questionnaire validator returned errors", status=400)
response = Response(content=json.dumps(response), status_code=400)
return response
logger.info("Schema validation passed", status=200)
response = Response(content=json.dumps(response), status_code=200)
return response
if __name__ == "__main__":
uvicorn.run("api:app", workers=20, port=5001, reload=True)