forked from RedHatProductSecurity/osidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
124 lines (108 loc) · 4.18 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
collector API endpoints
"""
import logging
from drf_spectacular.utils import extend_schema
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from osidb.api_views import RudimentaryUserPathLoggingMixin
from .models import CollectorFramework
logger = logging.getLogger(__name__)
class index(RudimentaryUserPathLoggingMixin, APIView):
"""index API endpoint"""
@extend_schema(
responses={
200: {
"type": "object",
"properties": {
"index": {"type": "array", "items": {"type": "string"}},
},
}
}
)
def get(self, request, *args, **kwargs):
"""index API endpoint listing available collector API endpoints"""
logger.info("getting index")
from .urls import urlpatterns
return Response(
{
"index": [f"/{url.pattern}" for url in urlpatterns],
}
)
class healthy(RudimentaryUserPathLoggingMixin, APIView):
"""unauthenticated collector health check API endpoint"""
permission_classes = [AllowAny]
def get(self, request, *args, **kwargs):
"""
unauthenticated health check API endpoint
"""
logger.info("getting health status")
return Response()
class status(RudimentaryUserPathLoggingMixin, APIView):
"""collector status API endpoint"""
@extend_schema(
responses={
200: {
"type": "object",
"properties": {
"collectors": {
"type": "array",
"items": {
"type": "object",
"properties": {
"data": {
"type": "string",
"enum": ["EMPTY", "PARTIAL", "COMPLETE"],
},
"depends_on": {
"type": "array",
"items": {"type": "string"},
},
"error": {"type": "object", "nullable": True},
"is_complete": {"type": "boolean"},
"is_up2date": {"type": "boolean"},
"data_models": {
"type": "array",
"items": {"type": "string"},
},
"state": {
"type": "string",
"enum": ["PENDING", "BLOCKED", "READY", "RUNNING"],
},
"updated_until": {
"type": "string",
"format": "date-time",
},
},
},
},
},
}
}
)
def get(self, request, *args, **kwargs):
"""
get the overall status of all collectors and the collected data
"""
logger.info("getting collector status")
return Response(
{
"collectors": {
# TODO implement collector serializer
name: {
"data": collector.data_state,
"depends_on": collector.depends_on,
"error": collector.error,
"is_complete": collector.is_complete,
"is_up2date": collector.is_up2date,
"data_models": collector.data_models,
"state": collector.collector_state,
"updated_until": collector.updated_until_dt,
}
# the official collectors are always in the tasks directory
for name, collector in CollectorFramework.collectors().items()
if "tasks" in name
},
}
)