-
Notifications
You must be signed in to change notification settings - Fork 0
/
augment.py
208 lines (174 loc) · 9.04 KB
/
augment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# Augment the final project selection with additional required data.
# This script must be run after filter_projects.py has established a final list of projects.
#
# - Get the default branch name of every GitHub project
# - Get workflow run data for all push builds on the default branch of all projects
#
import os
from datetime import timedelta
from typing import Any, Dict, List
from branches import load_default_branches
from coverage import save_coverage
from coveralls_api_client import get_latest_coveralls_report_in_date_range
from data_io import read_dict_from_json_file
from projects import load_projects, load_projects_and_partition
from workflows import encode_workflow_runs_path, load_workflow_runs, load_workflows
from config import (
MAX_GITHUB_RESULTS_PER_PAGE,
NUM_PAGES,
NUM_PARTITIONS_DEFAULT_BRANCH,
NUM_WORKFLOW_RUNS,
SUPPORTED_LANGUAGE_GROUPS_MAP
)
from github_api_client import (
convert_str_to_datetime,
get_default_branch_for_repos_partitioned,
get_runs_for_workflow
)
def encode_coveralls_report_path(project_coverage_prefix: str, repo_id: str) -> str:
"""
Encode a path for the Coveralls report corresponding to a given project. Produces a path
of the form `data/project_coverage_repo123.json`, which indicates that the file contains
the Coveralls coverage report for repo 123.
"""
return f"{project_coverage_prefix}_repo{repo_id}.json"
def verify_projects_have_augmented_data(projects: List[Dict[str, str]],
augmented_data_dict: Dict[str, Dict[str, Any]]) -> None:
"""
Verify that all projects have an associated entry in a dict of augmented data dicts.
If any project is missing any augmented data, abort program execution.
"""
num_projects_missing_stuff = 0
print('Verifying that all projects have required augmented data...')
# Verify that each project has an entry in all dictionaries containing augmented data
for project in projects:
for augmented_data_name, augmented_data in augmented_data_dict.items():
if project['id'] not in augmented_data:
print(
f"ERROR: Missing {augmented_data_name} for project with ID {project['id']}")
num_projects_missing_stuff += 1
# If any project is missing any augmented data, exit
if num_projects_missing_stuff > 0:
print('ERROR: One or more projects are missing required augmented data, aborting!')
exit()
def load_projects_workflows_branches(projects_path: str, workflows_path: str, default_branches_path: str):
"""
Load the projects, along with workflow and default branch data. Verify all augmented data,
before returning the project, workflow, and default branch dictionaries as a tuple.
"""
projects = load_projects(projects_path, False)
workflows_dict = load_workflows(workflows_path)
default_branches_dict = load_default_branches(default_branches_path)
# Verify that workflows and default branches exist for all projects
verify_projects_have_augmented_data(
projects, {'workflows': workflows_dict, 'default branch': default_branches_dict})
return projects, workflows_dict, default_branches_dict
def get_default_branches_for_projects(projects_path: str, default_branches_path_prefix: str) -> None:
print(f"[!] Retrieving the default branch name for each project")
default_branches_output_path = f"{default_branches_path_prefix}.json"
if os.path.isfile(default_branches_output_path):
print(
f"[!] {default_branches_output_path} already exists, skipping...")
return
partitioned_projects = load_projects_and_partition(
projects_path, NUM_PARTITIONS_DEFAULT_BRANCH)
get_default_branch_for_repos_partitioned(
partitioned_projects, NUM_PARTITIONS_DEFAULT_BRANCH, default_branches_path_prefix)
print(
f"[!] Wrote default branch names file to {default_branches_output_path}")
print(f"[!] Done retrieving default branch names")
def get_workflow_runs(projects_path: str, workflows_path: str, default_branches_path: str,
workflow_runs_prefix: str) -> None:
print(
f"[!] Retrieving the {NUM_WORKFLOW_RUNS} most recent runs for each project workflow")
projects, workflows_dict, default_branches_dict = load_projects_workflows_branches(
projects_path, workflows_path, default_branches_path)
# Get workflow runs for all workflows in all projects
# NOTE: This will take a while, and may likely require restarting due to GitHub API rate limits
for i, project in enumerate(projects):
print(f"Getting workflow runs for project {i+1}/{len(projects)}")
for workflow_idx_str, workflow in workflows_dict[project['id']].items():
# Get workflow runs for this workflow if we haven't already
runs_output_path = encode_workflow_runs_path(
workflow_runs_prefix, project['id'], workflow_idx_str)
if not os.path.isfile(runs_output_path):
get_runs_for_workflow(
project['owner'],
project['name'],
default_branches_dict[project['id']],
workflow['name'],
runs_output_path,
NUM_PAGES,
MAX_GITHUB_RESULTS_PER_PAGE
)
print('[!] Done retrieving workflow runs (no summarized file was written)')
def get_coveralls_info(projects_path: str, workflows_path: str, default_branches_path: str,
workflow_runs_prefix: str, project_coverage_prefix: str,
language_coverage_path: str) -> None:
print('[!] Retrieving Coveralls code coverage info for each project')
if os.path.isfile(language_coverage_path):
print(f"[!] {language_coverage_path} already exists, skipping...")
return
reports_found, reports_found_by_lang = 0, {}
projects, workflows_dict, default_branches_dict = load_projects_workflows_branches(
projects_path, workflows_path, default_branches_path)
# Get Coveralls report for each project
for i, project in enumerate(projects):
print(
f"Getting Coveralls report for project {i+1}/{len(projects)} (# found = {reports_found})")
# Get SHAs (identifiers) for the head commits of every workflow run
proj_commits = {}
for workflow_idx_str, _ in workflows_dict[project['id']].items():
workflow_runs_path = encode_workflow_runs_path(
workflow_runs_prefix, project['id'], workflow_idx_str)
if not os.path.isfile(workflow_runs_path):
print(
f"ERROR: Workflow runs file does not exist at {workflow_runs_path}, aborting!")
exit()
workflow_runs = load_workflow_runs(workflow_runs_path)
for run in workflow_runs:
proj_commits[run['created_at']] = run['head_sha']
# Sort the commit SHAs by workflow run date (get newest commits first)
ordered_proj_commits = sorted(
proj_commits.items(),
key=lambda x: convert_str_to_datetime(x[0]),
reverse=True
)
coveralls_report_filename = encode_coveralls_report_path(
project_coverage_prefix, project['id'])
report = {}
if len(ordered_proj_commits) == 0:
print(
f"ERROR: No commits found for project id {project['id']}, aborting!")
exit()
elif not os.path.isfile(coveralls_report_filename):
# Get the latest Coveralls report created within 7 days before the latest build run
max_report_date = convert_str_to_datetime(
ordered_proj_commits[0][0])
min_report_date = max_report_date - timedelta(days=7)
report = get_latest_coveralls_report_in_date_range(
project['owner'],
project['name'],
default_branches_dict[project['id']],
min_report_date,
max_report_date,
output_filename=coveralls_report_filename
)
else:
# Report has already been retrieved, read it from disk
report = read_dict_from_json_file(coveralls_report_filename)
# Aggregate coverage by programming language group
if report:
reports_found += 1
language_group = SUPPORTED_LANGUAGE_GROUPS_MAP[project['language']]
if language_group not in reports_found_by_lang:
reports_found_by_lang[language_group] = []
reports_found_by_lang[language_group].append(
report['covered_percent'])
print(
f"Found Coveralls reports for {reports_found}/{len(projects)} projects")
# Write project language coverage to JSON file (will omit projects lacking Coveralls report)
save_coverage(reports_found_by_lang, language_coverage_path)
for lang, coverages in reports_found_by_lang.items():
print(f"Found {len(coverages)} coverage reports for {lang} projects")
print('[!] Done retrieving Coveralls code coverage info')