-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
158 lines (125 loc) · 4.6 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import gevent
from gevent import monkey
monkey.patch_all()
import json
import os
import re
import requests
from simple_salesforce import Salesforce
import sys
from time import sleep
# env file functions derrived from honcho
def __parse_env(content):
for line in content.splitlines():
m1 = re.match(r'\A([A-Za-z_0-9]+)=(.*)\Z', line)
if m1:
key, val = m1.group(1), m1.group(2)
# handle single quote enclosed value
m2 = re.match(r"\A'(.*)'\Z", val)
if m2:
val = m2.group(1)
# handle double quote enclosed value
m3 = re.match(r'\A"(.*)"\Z', val)
if m3:
val = re.sub(r'\\(.)', r'\1', m3.group(1))
os.environ[key] = val
def __read_env_file():
if os.path.exists('.env'):
try:
with open('.env') as f:
__parse_env(f.read())
except IOError:
# you don't have to have an env file
pass
global sf
def _connect():
global sf, base_url, headers
# start one connection so everyone can use it
sf = Salesforce(username=os.environ.get('USERNAME', ''),
password=os.environ.get('PASSWORD', ''),
security_token=os.environ.get('TOKEN', ''),
sandbox=os.environ.get('SANDBOX', '') != '')
base_url = 'https://' + sf.sf_instance + \
'/services/data/v28.0/sobjects/ApexTestQueueItem'
headers = {"Authorization": "Bearer " + sf.session_id,
'Content-type': 'application/json'}
def _queue_test(class_id):
res = requests.post(base_url,
data=json.dumps({"ApexClassId": class_id}),
headers=headers)
if 'Apex test class already enqueued' in res.content:
print "%s is already scheduled for testing, sleeping..." % class_id
print class_id
print res.content
sleep(10)
return _queue_test(class_id)
return res.json()['id']
def _query(soql):
url = 'https://'+sf.sf_instance+'/services/data/v28.0/query/'
params = {"q": soql}
res = requests.get(url, headers=headers, params=params)
assert res.status_code == 200
return res.json()['records']
def _check_test_result(test_id):
res = requests.get(base_url + '/' + test_id, headers=headers)
res = res.json()
if res['Status'] not in ('Queued', 'Processing'):
assert res['Status'] == 'Completed', 'bad status: %s' % res['Status']
items = res['ExtendedStatus'][1:-1].split('/')
if items[0] != items[1]:
print "ERROR"
res2 = _query(("select MethodName, Message, StackTrace "
"from ApexTestResult "
"where Outcome != 'Pass' "
"and AsyncApexJobId = '%s'" % res['ParentJobId']))
for r in res2:
print "*" * 78
print r['Message']
print r['StackTrace']
print "*" * 78
print test_id, res['ExtendedStatus']
return True
def _test(class_id):
test_id = _queue_test(class_id)
for i in range(30):
gevent.sleep(i*10)
if _check_test_result(test_id):
return
print "the test for class_id %s took too long, exiting" % class_id
def _last_apex_class_change():
last_soql = ("select SystemModstamp from ApexClass "
"order by SystemModstamp desc limit 1")
res = _query(last_soql)
if res:
return res[0]['SystemModstamp']
def _sleep_until_new_change(last, sleep_duration=2):
for i in range(5):
sleep(sleep_duration)
new_last = _last_apex_class_change()
if new_last != last:
return new_last
if sleep_duration < 100:
_sleep_until_new_change(last, sleep_duration * 2)
else:
# about 5 mins
print "No new tests for too long, shutting down to save api calls"
exit(1)
def class_ids(pattern):
soql = ("select Id, Name, SystemModstamp "
"from ApexClass where Name like '%s'")
for r in _query(soql % pattern):
yield r['Id']
def _tests(pattern):
return [gevent.spawn(_test, class_id) for class_id in class_ids(pattern)]
def process_forever(pattern):
last = _last_apex_class_change()
while True:
gevent.joinall(_tests(pattern))
last = _sleep_until_new_change(last)
print "Change detected, running again...."
if __name__ == "__main__":
__read_env_file()
_connect()
pattern = '%test%' if len(sys.argv) < 2 else sys.argv[1]
print 'Running tests matching "%s"' % pattern
process_forever(pattern)