-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfuzz.py
157 lines (127 loc) · 6.09 KB
/
fuzz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import subprocess, os, shutil, sys
TIMEOUT = 20 # seconds
BINARY = "./d8-68644/d8" # CHANGE THIS
DRRUN = "dynamorio/bin64/drrun" # DynamoRIO drrun path
COV_EVERY_N = 100 # Gather coverage every COV_EVERY_N testcases
def check_setup():
if not os.path.isdir("crashes"):
print("ERROR: Directory structure not setup correctly. Please run ./setup.sh")
sys.exit(1)
if not os.path.isdir("logs"):
print("ERROR: Directory structure not setup correctly. Please run ./setup.sh")
sys.exit(1)
if not os.path.isdir("coverage"):
print("ERROR: Directory structure not setup correctly. Please run ./setup.sh")
sys.exit(1)
if not os.path.isdir("coverage/html"):
print("ERROR: Directory structure not setup correctly. Please run ./setup.sh")
sys.exit(1)
if not os.path.isdir("dynamorio"):
print("ERROR: DynamoRIO not found in the local directory. Please run ./setup.sh or rename the DynamoRIO directory to \"dynamorio\"")
sys.exit(1)
if not os.path.exists("dynamorio/bin64/drrun") or not os.path.exists("dynamorio/tools/bin64/drcov2lcov") or not os.path.exists("dynamorio/tools/bin64/genhtml"):
print("ERROR: DynamoRIO was not installed correctly. Please run ./install-dynamorio.sh")
sys.exit(1)
def triage_crash(crash_msg, crash_file):
unique_msg = b"" # A message unique to this crash
unique_crash = 0
# Check for use after poisons
if b"use-after-poison" in crash_msg:
for line in crash_msg.split(b"\n"):
if line.strip().startswith(b"#0"):
unique_msg = line.strip().split(b" ")
break
call_stack_msg = unique_msg[1][-3:] + b" " + unique_msg[3]
# First ensure the call_stacks.log file actually exists
open("logs/call_stacks.log", "a+").close()
# If the crash is unique, this msg will not be in call_stacks.log file
with open("logs/call_stacks.log", "rb+") as f:
call_stacks = f.read().split(b"\n")
if call_stack_msg not in call_stacks:
f.write(call_stack_msg + b"\n")
filename = f"uap_{str(unique_msg[1], 'utf-8')}.js"
shutil.move(crash_file, f"./crashes/uaps/{filename}")
unique_crash = 1
# I will add in each type of crash as I find them
else:
filename = crash_file.split("/")[-1]
shutil.move(crash_file, f"./crashes/{filename}")
unique_crash = 1
return unique_crash
def main():
check_setup()
if len(sys.argv) != 3:
print("Usage: python3 fuzz.py <inputs_dir> <node_number>")
sys.exit(1)
input_dir = sys.argv[1]
try:
node_num = int(sys.argv[2])
except ValueError:
print("Error: node_number must be an integer")
sys.exit(1)
try:
file_list = os.listdir(input_dir)
except:
print(f"Error: directory {input_dir} does not exist")
sys.exit(1)
# Used to determine when to gather code coverage data
get_coverage = False
# ASAN_OPTIONS environment variable
asan_options = "detect_leaks=0,exitcode=42,abort_on_error=1,disable_coredump=0"
env = os.environ.copy()
env["ASAN_OPTIONS"] = asan_options
# /dev/null
FNULL = open(os.devnull, 'w')
# Used for test case number tracking and timeout percentage
num = timeouts = crashes = unique_crashes = 0
for f in file_list:
num += 1
""" No coverage to be gathered with d8
# Every 100 test cases, we want to gather coverage
if (num % COV_EVERY_N == 0):
get_coverage = True
"""
timeout_percent = float(timeouts) / float(num) * 100.0
filename = f"./{sys.argv[1]}/{f}"
print(f"#####################################################################")
print(f" Test case {num}: {f}")
print(f" Timeout Percentage: {timeout_percent}%")
print(f" Crashes: {crashes}")
print(f" Unique crashes: {unique_crashes}")
print(f"#####################################################################")
print()
try:
# If your binary requires extra arguments, add them here after the filename
child = subprocess.run([BINARY, filename, "--experimental-wasm-eh",
"--experimental-wasm-simd", "--experimental-wasm-return-call",
"--experimental-wasm-compilation-hints", "--experimental-wasm-gc",
"--experimental-wasm-typed-funcref", "--experimental-wasm-reftypes",
"--experimental-wasm-threads", "--experimental-wasm-type-reflection",
"--experimental-wasm-bigint", "--experimental-wasm-bulk-memory",
"--experimental-wasm-mv"], timeout=TIMEOUT, env=env, stdout=FNULL,
stderr=subprocess.PIPE)
exit_code = child.returncode
if exit_code != 0: # We crashed
# Figure out if the crash is unique
output = child.stderr
unique_crashes += triage_crash(output, filename)
crashes += 1
else:
# We only want to gather coverage for non-crashing test cases
if not get_coverage:
continue
subprocess.run([DRRUN, "-t", "drcov", "-logdir", "coverage", "--", BINARY,
"--experimental-wasm-eh", "--experimental-wasm-simd",
"--experimental-wasm-return-call", "--experimental-wasm-compilation-hints",
"--experimental-wasm-gc", "--experimental-wasm-typed-funcref",
"--experimental-wasm-reftypes", "--experimental-wasm-threads",
"--experimental-wasm-type-reflection", "--experimental-wasm-bigint",
"--experimental-wasm-bulk-memory", "--experimental-wasm-mv", filename])
get_coverage = False
except subprocess.TimeoutExpired:
timeouts += 1
log_file = f"logs/fuzzer{node_num}.log"
with open(log_file, "w") as f:
f.write(f"Timeout percentage: {timeout_percent}%\n")
if __name__ == "__main__":
main()