-
Notifications
You must be signed in to change notification settings - Fork 3
/
cluster_fuzzer.py
118 lines (94 loc) · 3.79 KB
/
cluster_fuzzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import multiprocessing
import subprocess
import sys
import serial.tools.list_ports
from main_fuzzer import MainFuzzer
class ClusterFuzzer:
"""This script can be used to split up the test cases of a fuzzer equally over multiple boards."""
# The number of boards used in parallel.
BATCH_SIZE = 16
# The cluster only consists of Basys3 boards.
BOARD = "basys3"
# FDTI USB Vendor ID and Product ID of the Basys3 boards.
VID_PID = "0403:6010"
# The default OpenOCD port is 6666.
# For other OpenOCD instances the port is incremented.
OPENOCD_PORT = 6666
def __init__(self, fuzzer_name) -> None:
bus_ports = self._get_bus_ports()
if len(bus_ports) < self.BATCH_SIZE:
raise Exception(
f"{self.BATCH_SIZE} boards requested but only {len(bus_ports)} boards found"
)
bus_ports.sort()
test_case_count = self._get_test_case_count(fuzzer_name)
if test_case_count < self.BATCH_SIZE:
self.BATCH_SIZE = test_case_count
test_cases_per_board, rest = divmod(test_case_count, self.BATCH_SIZE)
print(f"test_cases_per_board: {test_cases_per_board}, rest: {rest}")
with multiprocessing.Pool(self.BATCH_SIZE) as p:
index_start = 1
for i in range(self.BATCH_SIZE):
index_end = index_start + test_cases_per_board - 1
if i < rest:
index_end += 1
print(
f"board: {i}, index_start: {index_start}, index_end: {index_end}, openocd_port: {self.OPENOCD_PORT + i}, openocd_bus_port: {bus_ports[i]}"
)
p.apply_async(
MainFuzzer,
kwds={
"fuzzer_name": fuzzer_name,
"board": self.BOARD,
"index_start": index_start,
"index_end": index_end,
"quiet": True,
"openocd_port": self.OPENOCD_PORT + i,
"openocd_bus_port": bus_ports[i],
},
)
index_start += test_cases_per_board
p.close()
p.join()
def _get_bus_ports(self) -> list[str]:
"""Returns a list of the bus ports of the currently activated boards of the cluster."""
return [
# Cut :config.interface and only return bus and ports.
# http://www.linux-usb.org/FAQ.html#i6
port.location.split(":")[0]
for port in serial.tools.list_ports.grep(f"{self.VID_PID}.*:1\\.1")
]
def _get_test_case_count(self, fuzzer_name) -> int:
"""Return the total test case count of the specified fuzzer."""
process = subprocess.run(
[
"python",
"main_fuzzer.py",
"--fuzzer-name",
fuzzer_name,
"--board",
self.BOARD,
"--quiet", # Required so that only digits are in the response.
"--count-test-cases",
],
capture_output=True,
check=True,
encoding="utf-8",
)
print(process.stdout)
# The number of test cases are the only digits in the response.
test_case_count = ""
for char in process.stdout:
if char.isdigit():
test_case_count += char
return int(test_case_count)
if __name__ == "__main__":
if len(sys.argv) > 1:
fuzzer_name = sys.argv[1]
else:
raise ValueError("please specify a fuzzer name")
try:
ClusterFuzzer(fuzzer_name)
finally:
# Make sure that all OpenOCD processes are killed, regardless of how the script was terminated.
subprocess.run(["pkill", "openocd"])