forked from PKU-ASAL/WASEM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·182 lines (158 loc) · 6.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
from datetime import datetime
import json
from os import makedirs, path
import resource
import sh
import sys
def do_symgx(args):
from eunomia.arch.wasm.configuration import Configuration
Configuration.set_start_time(datetime.now().strftime("%Y%m%d%H%M%S%f"))
from SymGX import SymGX
Configuration.set_file(args.file.name)
# ecall_list must be specified
if not args.ecall_list:
print("Error: --symgx requires --ecall-list", file=sys.stderr)
exit(1)
SymGX(args)
def do_normal(args):
from seewasm.arch.wasm.configuration import Configuration
Configuration.set_start_time(datetime.now().strftime("%Y%m%d%H%M%S%f"))
print(args)
module_bytecode = args.file.read()
# create the corresponding wat file
wat_file_path = args.file.name.replace('.wasm', '.wat')
if not path.exists(wat_file_path):
sh.Command('wasm2wat')([args.file.name, "-o", wat_file_path])
print(
f"The corresponding wat file is written in: {wat_file_path}",
flush=True)
# conduct symbolic execution
if args.symbolic:
Configuration.set_verbose_flag(args.verbose)
Configuration.set_file(args.file.name)
Configuration.set_entry(args.entry)
Configuration.set_visualize(args.visualize)
Configuration.set_source_type(args.source_type)
Configuration.set_stdin(args.stdin, args.sym_stdin)
Configuration.set_sym_files(args.sym_files)
Configuration.set_incremental_solving(args.incremental)
Configuration.set_elem_index_to_func(wat_file_path)
Configuration.set_algo(args.search)
command_file_name = f"./output/result/{Configuration.get_file_name()}_{Configuration.get_start_time()}/command.json"
makedirs(path.dirname(command_file_name), exist_ok=False)
with open(command_file_name, 'w') as fp:
json.dump({"Command": " ".join(sys.argv)}, fp, indent=4)
# --args and --sym_args can exist simultaneously
# their order are fixed, i.e., --args is in front of --sym_args
# the file_name is always the argv[0]
Configuration.set_args(
Configuration.get_file_name(),
args.args, args.sym_args)
# import necessary part
from seewasm.arch.wasm.emulator import WasmSSAEmulatorEngine
from seewasm.arch.wasm.graph import Graph
wasmVM = WasmSSAEmulatorEngine(module_bytecode)
# run the emulator for SSA
Graph.wasmVM = wasmVM
Graph.initialize()
# draw the ICFG on basic block level, and exit
if Configuration.get_visualize():
from seewasm.arch.wasm.visualizator import visualize
# draw here
graph_path = path.join("output", "visualized_graph", f"{Configuration.get_file_name()}_{Configuration.get_start_time()}.gv")
visualize(Graph, graph_path)
print(f"The visualization of ICFG is done.")
return
graph = Graph()
graph.traverse()
else:
pass
def parse():
parser = argparse.ArgumentParser(
description='WASEM, a general symbolic execution framework for WebAssembly (WASM) binaries')
inputs = parser.add_argument_group('Input arguments')
inputs.add_argument('-f', '--file',
type=argparse.FileType('rb'),
help='binary file (.wasm)',
metavar='WASMMODULE', required=True)
inputs.add_argument('--stdin',
action='store',
type=str,
help='stream of stdin')
inputs.add_argument('--sym_stdin',
action='store',
type=int,
nargs=1,
help='stream of stdin in N bytes symbols')
inputs.add_argument('--args',
action='store',
type=str,
help='command line')
inputs.add_argument(
'--sym_args', type=int, nargs='+',
help="command line in symbols, each of them is N bytes at most")
inputs.add_argument(
'--sym_files', type=int, nargs=2,
help="Create N symbolic files, each of them has M symbolic bytes")
inputs.add_argument(
'--source_type', default='c', const='c', nargs='?',
choices=['c', 'go', 'rust'],
help='type of source file')
features = parser.add_argument_group('Features')
features.add_argument(
'--entry', type=str, nargs=1, default=["__original_main"],
help='set entry point as the specilized function')
features.add_argument(
'--visualize', action='store_true',
help='visualize the ICFG on basic blocks level')
features.add_argument(
'--incremental', action='store_true',
help='enable incremental solving')
features.add_argument(
'-v', '--verbose', default='warning', const='warning', nargs='?',
choices=['warning', 'info', 'debug'],
help='set the logging level')
analyze = parser.add_argument_group('Analyze')
analyze.add_argument(
'-s', '--symbolic', action='store_true',
help='perform the symbolic execution')
analyze.add_argument(
'--search', default='dfs', const='dfs', nargs='?',
choices=['dfs', 'bfs', 'random', 'interval'],
help='set the search algorithm')
analyze.add_argument(
'--max-time', action='store', type=int,
help='maximum time in seconds')
analyze.add_argument(
'--max-memory', action='store', type=int,
help='maximum memory in MB')
symgx = parser.add_argument_group('Symgx')
symgx.add_argument('--symgx', action='store_true', help='enable the branch of symgx', default=False)
symgx.add_argument('--ecall-list', help='ecall list string, separated by commas (`,`)')
symgx.add_argument('--func-list', help='function list string, separated by commas (`,`)')
args = parser.parse_args()
return args
def main():
args = parse()
if args.max_memory:
resource.setrlimit(resource.RLIMIT_AS, (args.max_memory * 1024 * 1024, args.max_memory * 1024 * 1024))
print(f"Memory limit set to {args.max_memory} MB", flush=True)
job_start_time = datetime.now()
current_time_start = job_start_time.strftime("%Y-%m-%d %H:%M:%S_%f")
print(f"Start to analyze: {current_time_start}", flush=True)
print(f"Running...", flush=True)
if args.symgx:
do_symgx(args)
else:
do_normal(args)
print(f"Finished.", flush=True)
job_end_time = datetime.now()
current_time_end = job_end_time.strftime("%Y-%m-%d %H:%M:%S_%f")
print(f"End of analyze: {current_time_end}", flush=True)
elapsed_time = job_end_time - job_start_time
print(f"Time elapsed: {elapsed_time}", flush=True)
if __name__ == '__main__':
main()