forked from machine-learning-apps/actions-app-token
-
Notifications
You must be signed in to change notification settings - Fork 0
/
token_getter.py
152 lines (118 loc) · 5.7 KB
/
token_getter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from collections import namedtuple, Counter
from github3 import GitHub
from pathlib import Path
from cryptography.hazmat.backends import default_backend
import time
import json
import jwt
import requests
from typing import List
import os
class GitHubApp(GitHub):
"""
This is a small wrapper around the github3.py library
Provides some convenience functions for testing purposes.
"""
def __init__(self, pem_path, app_id, nwo):
super().__init__()
self.app_id = app_id
self.path = Path(pem_path)
self.app_id = app_id
if not self.path.is_file():
raise ValueError(f'argument: `pem_path` must be a valid filename. {pem_path} was not found.')
self.nwo = nwo
def get_app(self):
with open(self.path, 'rb') as key_file:
client = GitHub()
client.login_as_app(private_key_pem=key_file.read(),
app_id=self.app_id)
return client
def get_installation(self, installation_id):
"login as app installation without requesting previously gathered data."
with open(self.path, 'rb') as key_file:
client = GitHub()
client.login_as_app_installation(private_key_pem=key_file.read(),
app_id=self.app_id,
installation_id=installation_id)
return client
def get_test_installation_id(self):
"Get a sample test_installation id."
client = self.get_app()
return next(client.app_installations()).id
def get_test_installation(self):
"login as app installation with the first installation_id retrieved."
return self.get_installation(self.get_test_installation_id())
def get_test_repo(self):
repo = self.get_all_repos(self.get_test_installation_id())[0]
appInstallation = self.get_test_installation()
owner, name = repo['full_name'].split('/')
return appInstallation.repository(owner, name)
def get_test_issue(self):
test_repo = self.get_test_repo()
return next(test_repo.issues())
def get_jwt(self):
"""
This is needed to retrieve the installation access token (for debugging).
Useful for debugging purposes. Must call .decode() on returned object to get string.
"""
now = self._now_int()
payload = {
"iat": now,
"exp": now + (60),
"iss": self.app_id
}
with open(self.path, 'rb') as key_file:
private_key = default_backend().load_pem_private_key(key_file.read(), None)
return jwt.encode(payload, private_key, algorithm='RS256')
def get_installation_id(self):
"https://developer.github.com/v3/apps/#find-repository-installation"
owner, repo = self.nwo.split('/')
url = f'https://api.github.com/repos/{owner}/{repo}/installation'
headers = {'Authorization': f'Bearer {self.get_jwt().decode()}',
'Accept': 'application/vnd.github.machine-man-preview+json'}
response = requests.get(url=url, headers=headers)
if response.status_code != 200:
raise Exception(f'Status code : {response.status_code}, {response.json()}')
return response.json()['id']
def get_installation_access_token(self, installation_id):
"Get the installation access token for debugging."
url = f'https://api.github.com/app/installations/{installation_id}/access_tokens'
headers = {'Authorization': f'Bearer {self.get_jwt().decode()}',
'Accept': 'application/vnd.github.machine-man-preview+json'}
response = requests.post(url=url, headers=headers)
if response.status_code != 201:
raise Exception(f'Status code : {response.status_code}, {response.json()}')
return response.json()['token']
def _extract(self, d, keys):
"extract selected keys from a dict."
return dict((k, d[k]) for k in keys if k in d)
def _now_int(self):
return int(time.time())
def get_all_repos(self, installation_id):
"""Get all repos that this installation has access to.
Useful for testing and debugging.
"""
url = 'https://api.github.com/installation/repositories'
headers={'Authorization': f'token {self.get_installation_access_token(installation_id)}',
'Accept': 'application/vnd.github.machine-man-preview+json'}
response = requests.get(url=url, headers=headers)
if response.status_code >= 400:
raise Exception(f'Status code : {response.status_code}, {response.json()}')
fields = ['name', 'full_name', 'id']
return [self._extract(x, fields) for x in response.json()['repositories']]
def generate_installation_curl(self, endpoint):
iat = self.get_installation_access_token()
print(f'curl -i -H "Authorization: token {iat}" -H "Accept: application/vnd.github.machine-man-preview+json" https://api.github.com{endpoint}')
if __name__ == '__main__':
pem_path = 'pem.txt'
app_id = os.getenv('INPUT_APP_ID')
nwo = os.getenv('GITHUB_REPOSITORY')
assert pem_path, 'Must supply input APP_PEM'
assert app_id, 'Must supply input APP_ID'
assert nwo, "The environment variable GITHUB_REPOSITORY was not found."
app = GitHubApp(pem_path=pem_path, app_id=app_id, nwo=nwo)
id = app.get_installation_id()
token = app.get_installation_access_token(installation_id=id)
assert token, 'Token not returned!'
print(f"::add-mask::{token}")
print(f"::set-output name=app_token::{token}")