forked from kelvinBen/DrozerFuzz
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AutoTest.py
300 lines (255 loc) · 13.6 KB
/
AutoTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# -*- coding: UTF-8 -*-
import os
import re
import time
from DrozerServer import DrozerServer
class Fuzz():
components_types = ["activity", "service", "broadcast", "provider"]
scanner_types = ["traversal", "injection"]
def __init__(self, _ip,_port,_scanner_types,_components_types):
ip_adrr = "%s:%s" % (_ip, _port)
self.session = DrozerServer(ip_adrr).getSession()
self.initConfig(_ip,_port)
if _scanner_types:
self.scanner_types = _scanner_types
if _components_types:
self.components_types = _components_types
def initConfig(self,_ip,_port):
connect_cmd = "adb connect %s" % ip
forward_cmd = "adb forward tcp:%s tcp:%s" % (_port, _port)
dir_img_cmd = "adb shell mkdir /sdcard/tempimg/"
rm_img_cmd = "adb shell rm -rf /sdcard/tempimg/*"
os.popen(connect_cmd)
os.popen(forward_cmd)
os.popen(dir_img_cmd)
os.popen(rm_img_cmd)
# 获取包名信息
def get_packages(self, temp_root_path,packageName=None):
no_exported_path = os.path.join(temp_root_path, "no_exported")
no_exported_file = open(no_exported_path, "a+")
self.no_exported_list = no_exported_file.readlines()
no_exported_file.close()
if packageName:
self.scopPackage(temp_root_path,packageName,self.components_types)
else:
self.getAllPackage(temp_root_path,self.components_types)
# 将所有的截图文件pull下来
screencap_path = os.path.join(temp_root_path,"screencap")
if not os.path.exists(screencap_path):
os.mkdir(screencap_path)
cmd = "adb pull /sdcard/tempimg/ %s" % screencap_path
os.popen(cmd)
# no_exported_file.writelines(self.no_exported_list)
# no_exported_file.close()
for ln in self.no_exported_list:
print ln
# 查看指定应用的组件信息
def scopPackage(self,temp_root_path,packageName,components_types):
print "+ 正在处理应用:" + str(packageName)
# packageName_path = os.path.join(temp_root_path, packageName)
for components_type in components_types:
self.getComponentsInfo(temp_root_path,packageName, components_type)
time.sleep(5)
# 获取到所有应用的组件信息
def getAllPackage(self,temp_root_path,components_types):
package_path = os.path.join(temp_root_path, "package")
if not os.path.exists(package_path):
package_file = open(package_path, "w+")
self.session.stdout = package_file
cmd = "app.package.list"
self.session.do_run(cmd)
time.sleep(2)
package_file.close()
curred_app_num = 1
with open(package_path, "r") as package_file:
lines = package_file.readlines()
for line in lines:
package_name = line[:line.rfind(' (')]
app_name = line[line.rfind('('):line.rfind(')')].replace("(", "").replace(")", "")
new_app_name = str(package_name) + "(" + str(app_name) +")"
print "+ 共计:"+str(len(lines))+"个应用,正在处理第"+ str(curred_app_num) +"个应用:" + new_app_name
no_exported_num = 0
for components_type in components_types:
exported_components_num = self.getComponentsInfo(temp_root_path, package_name, components_type)
no_exported_num = no_exported_num + exported_components_num
time.sleep(5)
if no_exported_num == 0:
self.no_exported_list.append(new_app_name+"\n")
curred_app_num = curred_app_num + 1
package_file.close()
# 获取应用的组件信息
def getComponentsInfo(self, temp_root_path,package_name, components_type):
components_info_path = os.path.join(temp_root_path, package_name+"."+components_type)
if not os.path.exists(components_info_path):
components_info_file = open(components_info_path, "w+")
self.session.stdout = components_info_file
cmd = "app.%s.info -a %s" % (components_type,package_name)
print "\t |- 正在查询组件信息,命令为:"+cmd
self.session.do_run(cmd)
time.sleep(2)
components_info_file.close()
modules = []
components_names = self.getComponents(components_info_path,components_type)
if components_type != "provider":
print "\t\t |- 组件:" + components_type+" 可以被导出的数量为:"+str(len(components_names))
else:
print "\t\t |- 组件:"+components_type +"存在问题的内容数量为:"+str(len(components_names))
components_num = 0
for components_name in components_names:
if components_type == "provider":
pattern = re.compile(r'[0-9]@_@')
print "\t\t\t |- 可能存在问题的组件为:" + str(re.sub(pattern, "", components_name))
else:
print "\t\t\t |- 可以导出的组件名称为:" + components_name
if components_type == "activity":
modules = ["start"]
elif components_type == "service":
modules = ["start", "stop"]
elif components_type == "broadcast":
modules =["send"]
else:
modules =["projection","selection",""]
self.runComponents(temp_root_path,components_type, modules, package_name, components_name,components_num)
components_num = components_num + 1
return len(components_names)
# 执行组件并进行截图
def runComponents(self,temp_root_path,components_type,modules,package_name,components_name,components_num):
for module in modules:
if components_type == "provider":
content_list = components_name.split("@_@")
if content_list[0] == "1" and module.startswith("projection"):
self.injectioninQuery(temp_root_path, package_name, content_list[1], module,components_num)
continue
if content_list[0] == "2" and module.startswith("selection"):
self.injectioninQuery(temp_root_path, package_name, content_list[1], module,components_num)
continue
if content_list[0] == "3" and module.startswith(""):
self.injectioninQuery(temp_root_path, package_name, content_list[1], module,components_num)
continue
else:
cmd = "app.%s.%s --component %s %s" % (components_type, module, str(package_name), str(components_name))
print "\t\t\t\t |- 正在执行组件,命令为:" + cmd
self.session.do_run(cmd)
time.sleep(2)
screencap_cmd = "adb shell screencap -p /sdcard/tempimg/%s_%s_%s.png" % (
str(components_type), str(package_name), str(components_name))
os.popen(screencap_cmd)
os.popen("adb shell am start com.mwr.dz/com.mwr.dz.activities.MainActivity")
def getComponents(self, components_info_path, components_type):
components_names = []
# 忽略0kb大小的文件
if os.path.getsize(components_info_path) == 0:
return components_names
with open(components_info_path) as components_info_file:
for line in components_info_file:
le = line.replace(" ", "").replace("\n", "")
# 忽略第一行的包信息
if le.startswith("Package:"):
package = le.replace("Package:", "")
continue
# 忽略没有组件导出的文件
if le.startswith("Noexportedservices.") or le.startswith("Nomatchingreceivers.") or le.startswith(
"Nomatchingactivities.") or le.startswith("Nomatchingproviders."):
# print "no exported...."
return components_names
if components_type == "provider":
if le.startswith("ReadPermission:null") or le.startswith("WritePermission:null"):
provider_root_path = os.path.join(temp_root_path, components_type)
if not os.path.exists(provider_root_path):
os.mkdir(provider_root_path)
self.scannerProvider(components_names,provider_root_path, package)
return components_names
else:
continue
self.clStr(components_names, le)
return components_names
# 处理组件的基本信息
def clStr(self,components_names,le) :
# 忽略所有Permission行
if not le.startswith("Permission"):
# print "no Permission...."
if not (le.startswith("ParentActivity:") or le.startswith("TargetActivity:")):
if len(le)>0:
# print "====== add components_name to components_names:"+le
components_names.append(le)
# else:
# print "is ParentActivity and TargetActivity...."
else:
permission_package = le.replace("Permission:", "")
if not permission_package.startswith("null"):
# print "permission no is null ...."
if len(components_names) != 0:
# print "====== remove components_name to components_names:" + components_names[len(components_names)-1]
components_names.remove(components_names[len(components_names)-1])
# 扫描组件Provider的漏洞信息
def scannerProvider (self,components_names,temp_root_path, package_name):
for scanner_type in self.scanner_types:
scanner_provider_path = os.path.join(temp_root_path, package_name + "."+scanner_type)
if not os.path.exists(scanner_provider_path):
scanner_provider_file = open(scanner_provider_path,"w+")
self.session.stdout = scanner_provider_file
cmd = "scanner.provider.%s -a %s" % (str(scanner_type),str(package_name))
print "\t\t |- 正在扫描Provuder组件漏洞信息,命令为:"+cmd
self.session.do_run(cmd)
time.sleep(2)
scanner_provider_file.close()
flag =0
with open(scanner_provider_path) as scanner_provider_file:
for line in scanner_provider_file:
le = line.replace(" ", "").replace("\n", "")
if le.startswith("NotVulnerable") or le.startswith("Novulnerabilitiesfound") or le.startswith("Scanning"):
continue
if le.startswith("InjectioninProjection"):
flag = 1
continue
if le.startswith("InjectioninSelection"):
flag = 2
continue
if le.startswith("VulnerableProviders"):
flag =3
continue
if flag != 0 and le.startswith("content://"):
components_names.append(str(flag)+"@_@"+le)
scanner_provider_file.close()
# 注入
def injectioninQuery(self,temp_root_path,package_name,content_name,schme_type,components_num):
injectionin_dir = os.path.join(temp_root_path,"injectionin")
if not os.path.exists(injectionin_dir):
os.mkdir(injectionin_dir)
schme_type_err_provider_path = os.path.join(injectionin_dir, "err_"+package_name + "_"+str(components_num) +"." + schme_type)
if os.path.exists(schme_type_err_provider_path):
os.remove(schme_type_err_provider_path)
# if not os.path.exists(schme_type_provider_path):
scanner_provider_file = open(schme_type_err_provider_path, "a+")
self.session.stderr = scanner_provider_file
cmd = "app.provider.query %s --%s %s" % (str(content_name), str(schme_type),'\"\'\"')
print "\t\t\t\t |- 正在进行注入操作,命令为:" + cmd
self.session.do_run(cmd)
scanner_provider_file.write(cmd + "\r")
scanner_provider_file.close()
return schme_type_err_provider_path
# 对内容组件进行操作并进行截图
def actionProvider(self, components_name,):
cmd = "scanner.provider.traversal -a %s" % (str(components_name))
print "\t\t\t |- "+cmd
self.session.do_run(cmd)
time.sleep(2)
cmd = "scanner.provider.injection -a %s" % (str(components_name))
print "\t\t\t |- "+cmd
self.session.do_run(cmd)
if __name__ == '__main__':
temp_root_path = "C:\\Users\\root\\Desktop\\temp"
components_types =None
scanner_types =None
components_types = ["activity", "service", "broadcast", "provider"]
# components_types = ["provider"]
# scanner_types = ["traversal", "injection"]
if False:
ip = "192.168.43.1"
port = "41415"
else:
ip = "127.0.0.1"
port = "41415"
package_name ="com.android.phone"
# Fuzz(ipAdrr,scanner_types,components_types).get_packages(temp_root_path,"com.mwr.example.sieve")
Fuzz(ip,port, scanner_types, components_types).get_packages(temp_root_path,)