Commit | Line | Data |
---|---|---|
c48d8b04 BB |
1 | #!/usr/bin/env python3 |
2 | # SPDX-License-Identifier: GPL-2.0 | |
3 | # | |
4 | # Program to allow users to fuzz test Hyper-V drivers | |
5 | # by interfacing with Hyper-V debugfs attributes. | |
6 | # Current test methods available: | |
7 | # 1. delay testing | |
8 | # | |
9 | # Current file/directory structure of hyper-V debugfs: | |
10 | # /sys/kernel/debug/hyperv/UUID | |
11 | # /sys/kernel/debug/hyperv/UUID/<test-state filename> | |
12 | # /sys/kernel/debug/hyperv/UUID/<test-method sub-directory> | |
13 | # | |
14 | # author: Branden Bonaby <brandonbonaby94@gmail.com> | |
15 | ||
16 | import os | |
17 | import cmd | |
18 | import argparse | |
19 | import glob | |
20 | from argparse import RawDescriptionHelpFormatter | |
21 | from argparse import RawTextHelpFormatter | |
22 | from enum import Enum | |
23 | ||
24 | # Do not change unless, you change the debugfs attributes | |
25 | # in /drivers/hv/debugfs.c. All fuzz testing | |
26 | # attributes will start with "fuzz_test". | |
27 | ||
28 | # debugfs path for hyperv must exist before proceeding | |
29 | debugfs_hyperv_path = "/sys/kernel/debug/hyperv" | |
30 | if not os.path.isdir(debugfs_hyperv_path): | |
31 | print("{} doesn't exist/check permissions".format(debugfs_hyperv_path)) | |
32 | exit(-1) | |
33 | ||
34 | class dev_state(Enum): | |
35 | off = 0 | |
36 | on = 1 | |
37 | ||
38 | # File names, that correspond to the files created in | |
39 | # /drivers/hv/debugfs.c | |
40 | class f_names(Enum): | |
41 | state_f = "fuzz_test_state" | |
42 | buff_f = "fuzz_test_buffer_interrupt_delay" | |
43 | mess_f = "fuzz_test_message_delay" | |
44 | ||
45 | # Both single_actions and all_actions are used | |
46 | # for error checking and to allow for some subparser | |
47 | # names to be abbreviated. Do not abbreviate the | |
48 | # test method names, as it will become less intuitive | |
49 | # as to what the user can do. If you do decide to | |
50 | # abbreviate the test method name, make sure the main | |
51 | # function reflects this change. | |
52 | ||
53 | all_actions = [ | |
54 | "disable_all", | |
55 | "D", | |
56 | "enable_all", | |
57 | "view_all", | |
58 | "V" | |
59 | ] | |
60 | ||
61 | single_actions = [ | |
62 | "disable_single", | |
63 | "d", | |
64 | "enable_single", | |
65 | "view_single", | |
66 | "v" | |
67 | ] | |
68 | ||
69 | def main(): | |
70 | ||
71 | file_map = recursive_file_lookup(debugfs_hyperv_path, dict()) | |
72 | args = parse_args() | |
73 | if (not args.action): | |
74 | print ("Error, no options selected...exiting") | |
75 | exit(-1) | |
76 | arg_set = { k for (k,v) in vars(args).items() if v and k != "action" } | |
77 | arg_set.add(args.action) | |
78 | path = args.path if "path" in arg_set else None | |
79 | if (path and path[-1] == "/"): | |
80 | path = path[:-1] | |
81 | validate_args_path(path, arg_set, file_map) | |
82 | if (path and "enable_single" in arg_set): | |
83 | state_path = locate_state(path, file_map) | |
84 | set_test_state(state_path, dev_state.on.value, args.quiet) | |
85 | ||
86 | # Use subparsers as the key for different actions | |
87 | if ("delay" in arg_set): | |
88 | validate_delay_values(args.delay_time) | |
89 | if (args.enable_all): | |
90 | set_delay_all_devices(file_map, args.delay_time, | |
91 | args.quiet) | |
92 | else: | |
93 | set_delay_values(path, file_map, args.delay_time, | |
94 | args.quiet) | |
95 | elif ("disable_all" in arg_set or "D" in arg_set): | |
96 | disable_all_testing(file_map) | |
97 | elif ("disable_single" in arg_set or "d" in arg_set): | |
98 | disable_testing_single_device(path, file_map) | |
99 | elif ("view_all" in arg_set or "V" in arg_set): | |
100 | get_all_devices_test_status(file_map) | |
101 | elif ("view_single" in arg_set or "v" in arg_set): | |
102 | get_device_test_values(path, file_map) | |
103 | ||
104 | # Get the state location | |
105 | def locate_state(device, file_map): | |
106 | return file_map[device][f_names.state_f.value] | |
107 | ||
108 | # Validate delay values to make sure they are acceptable to | |
109 | # enable delays on a device | |
110 | def validate_delay_values(delay): | |
111 | ||
112 | if (delay[0] == -1 and delay[1] == -1): | |
113 | print("\nError, At least 1 value must be greater than 0") | |
114 | exit(-1) | |
115 | for i in delay: | |
116 | if (i < -1 or i == 0 or i > 1000): | |
117 | print("\nError, Values must be equal to -1 " | |
118 | "or be > 0 and <= 1000") | |
119 | exit(-1) | |
120 | ||
121 | # Validate argument path | |
122 | def validate_args_path(path, arg_set, file_map): | |
123 | ||
124 | if (not path and any(element in arg_set for element in single_actions)): | |
125 | print("Error, path (-p) REQUIRED for the specified option. " | |
126 | "Use (-h) to check usage.") | |
127 | exit(-1) | |
128 | elif (path and any(item in arg_set for item in all_actions)): | |
129 | print("Error, path (-p) NOT REQUIRED for the specified option. " | |
130 | "Use (-h) to check usage." ) | |
131 | exit(-1) | |
132 | elif (path not in file_map and any(item in arg_set | |
133 | for item in single_actions)): | |
134 | print("Error, path '{}' not a valid vmbus device".format(path)) | |
135 | exit(-1) | |
136 | ||
137 | # display Testing status of single device | |
138 | def get_device_test_values(path, file_map): | |
139 | ||
140 | for name in file_map[path]: | |
141 | file_location = file_map[path][name] | |
142 | print( name + " = " + str(read_test_files(file_location))) | |
143 | ||
144 | # Create a map of the vmbus devices and their associated files | |
145 | # [key=device, value = [key = filename, value = file path]] | |
146 | def recursive_file_lookup(path, file_map): | |
147 | ||
148 | for f_path in glob.iglob(path + '**/*'): | |
149 | if (os.path.isfile(f_path)): | |
150 | if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path): | |
151 | directory = f_path.rsplit("/",1)[0] | |
152 | else: | |
153 | directory = f_path.rsplit("/",2)[0] | |
154 | f_name = f_path.split("/")[-1] | |
155 | if (file_map.get(directory)): | |
156 | file_map[directory].update({f_name:f_path}) | |
157 | else: | |
158 | file_map[directory] = {f_name:f_path} | |
159 | elif (os.path.isdir(f_path)): | |
160 | recursive_file_lookup(f_path,file_map) | |
161 | return file_map | |
162 | ||
163 | # display Testing state of devices | |
164 | def get_all_devices_test_status(file_map): | |
165 | ||
166 | for device in file_map: | |
ed0cf84e | 167 | if (get_test_state(locate_state(device, file_map)) == 1): |
c48d8b04 BB |
168 | print("Testing = ON for: {}" |
169 | .format(device.split("/")[5])) | |
170 | else: | |
171 | print("Testing = OFF for: {}" | |
172 | .format(device.split("/")[5])) | |
173 | ||
174 | # read the vmbus device files, path must be absolute path before calling | |
175 | def read_test_files(path): | |
176 | try: | |
177 | with open(path,"r") as f: | |
178 | file_value = f.readline().strip() | |
179 | return int(file_value) | |
180 | ||
181 | except IOError as e: | |
182 | errno, strerror = e.args | |
183 | print("I/O error({0}): {1} on file {2}" | |
184 | .format(errno, strerror, path)) | |
185 | exit(-1) | |
186 | except ValueError: | |
187 | print ("Element to int conversion error in: \n{}".format(path)) | |
188 | exit(-1) | |
189 | ||
190 | # writing to vmbus device files, path must be absolute path before calling | |
191 | def write_test_files(path, value): | |
192 | ||
193 | try: | |
194 | with open(path,"w") as f: | |
195 | f.write("{}".format(value)) | |
196 | except IOError as e: | |
197 | errno, strerror = e.args | |
198 | print("I/O error({0}): {1} on file {2}" | |
199 | .format(errno, strerror, path)) | |
200 | exit(-1) | |
201 | ||
202 | # set testing state of device | |
203 | def set_test_state(state_path, state_value, quiet): | |
204 | ||
205 | write_test_files(state_path, state_value) | |
ed0cf84e | 206 | if (get_test_state(state_path) == 1): |
c48d8b04 BB |
207 | if (not quiet): |
208 | print("Testing = ON for device: {}" | |
209 | .format(state_path.split("/")[5])) | |
210 | else: | |
211 | if (not quiet): | |
212 | print("Testing = OFF for device: {}" | |
213 | .format(state_path.split("/")[5])) | |
214 | ||
215 | # get testing state of device | |
216 | def get_test_state(state_path): | |
217 | #state == 1 - test = ON | |
218 | #state == 0 - test = OFF | |
219 | return read_test_files(state_path) | |
220 | ||
221 | # write 1 - 1000 microseconds, into a single device using the | |
222 | # fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay | |
223 | # debugfs attributes | |
224 | def set_delay_values(device, file_map, delay_length, quiet): | |
225 | ||
226 | try: | |
227 | interrupt = file_map[device][f_names.buff_f.value] | |
228 | message = file_map[device][f_names.mess_f.value] | |
229 | ||
230 | # delay[0]- buffer interrupt delay, delay[1]- message delay | |
231 | if (delay_length[0] >= 0 and delay_length[0] <= 1000): | |
232 | write_test_files(interrupt, delay_length[0]) | |
233 | if (delay_length[1] >= 0 and delay_length[1] <= 1000): | |
234 | write_test_files(message, delay_length[1]) | |
235 | if (not quiet): | |
236 | print("Buffer delay testing = {} for: {}" | |
237 | .format(read_test_files(interrupt), | |
238 | interrupt.split("/")[5])) | |
239 | print("Message delay testing = {} for: {}" | |
240 | .format(read_test_files(message), | |
241 | message.split("/")[5])) | |
242 | except IOError as e: | |
243 | errno, strerror = e.args | |
244 | print("I/O error({0}): {1} on files {2}{3}" | |
245 | .format(errno, strerror, interrupt, message)) | |
246 | exit(-1) | |
247 | ||
248 | # enabling delay testing on all devices | |
249 | def set_delay_all_devices(file_map, delay, quiet): | |
250 | ||
251 | for device in (file_map): | |
252 | set_test_state(locate_state(device, file_map), | |
253 | dev_state.on.value, | |
254 | quiet) | |
255 | set_delay_values(device, file_map, delay, quiet) | |
256 | ||
257 | # disable all testing on a SINGLE device. | |
258 | def disable_testing_single_device(device, file_map): | |
259 | ||
260 | for name in file_map[device]: | |
261 | file_location = file_map[device][name] | |
262 | write_test_files(file_location, dev_state.off.value) | |
263 | print("ALL testing now OFF for {}".format(device.split("/")[-1])) | |
264 | ||
265 | # disable all testing on ALL devices | |
266 | def disable_all_testing(file_map): | |
267 | ||
268 | for device in file_map: | |
269 | disable_testing_single_device(device, file_map) | |
270 | ||
271 | def parse_args(): | |
272 | parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n" | |
273 | "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n" | |
274 | "%(prog)s [view_all | V] [-h]\n" | |
275 | "%(prog)s [disable_all | D] [-h]\n" | |
276 | "%(prog)s [disable_single | d] [-h|-p]\n" | |
277 | "%(prog)s [view_single | v] [-h|-p]\n" | |
278 | "%(prog)s --version\n", | |
279 | description = "\nUse lsvmbus to get vmbus device type " | |
280 | "information.\n" "\nThe debugfs root path is " | |
281 | "/sys/kernel/debug/hyperv", | |
282 | formatter_class = RawDescriptionHelpFormatter) | |
283 | subparsers = parser.add_subparsers(dest = "action") | |
284 | parser.add_argument("--version", action = "version", | |
285 | version = '%(prog)s 0.1.0') | |
286 | parser.add_argument("-q","--quiet", action = "store_true", | |
287 | help = "silence none important test messages." | |
288 | " This will only work when enabling testing" | |
289 | " on a device.") | |
290 | # Use the path parser to hold the --path attribute so it can | |
291 | # be shared between subparsers. Also do the same for the state | |
292 | # parser, as all testing methods will use --enable_all and | |
293 | # enable_single. | |
294 | path_parser = argparse.ArgumentParser(add_help=False) | |
295 | path_parser.add_argument("-p","--path", metavar = "", | |
296 | help = "Debugfs path to a vmbus device. The path " | |
297 | "must be the absolute path to the device.") | |
298 | state_parser = argparse.ArgumentParser(add_help=False) | |
299 | state_group = state_parser.add_mutually_exclusive_group(required = True) | |
300 | state_group.add_argument("-E", "--enable_all", action = "store_const", | |
301 | const = "enable_all", | |
302 | help = "Enable the specified test type " | |
303 | "on ALL vmbus devices.") | |
304 | state_group.add_argument("-e", "--enable_single", | |
305 | action = "store_const", | |
306 | const = "enable_single", | |
307 | help = "Enable the specified test type on a " | |
308 | "SINGLE vmbus device.") | |
309 | parser_delay = subparsers.add_parser("delay", | |
310 | parents = [state_parser, path_parser], | |
311 | help = "Delay the ring buffer interrupt or the " | |
312 | "ring buffer message reads in microseconds.", | |
313 | prog = "vmbus_testing", | |
314 | usage = "%(prog)s [-h]\n" | |
315 | "%(prog)s -E -t [value] [value]\n" | |
316 | "%(prog)s -e -t [value] [value] -p", | |
317 | description = "Delay the ring buffer interrupt for " | |
318 | "vmbus devices, or delay the ring buffer message " | |
319 | "reads for vmbus devices (both in microseconds). This " | |
320 | "is only on the host to guest channel.") | |
321 | parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2, | |
322 | type = check_range, default =[0,0], required = (True), | |
323 | help = "Set [buffer] & [message] delay time. " | |
324 | "Value constraints: -1 == value " | |
325 | "or 0 < value <= 1000.\n" | |
326 | "Use -1 to keep the previous value for that delay " | |
327 | "type, or a value > 0 <= 1000 to change the delay " | |
328 | "time.") | |
329 | parser_dis_all = subparsers.add_parser("disable_all", | |
330 | aliases = ['D'], prog = "vmbus_testing", | |
331 | usage = "%(prog)s [disable_all | D] -h\n" | |
332 | "%(prog)s [disable_all | D]\n", | |
333 | help = "Disable ALL testing on ALL vmbus devices.", | |
334 | description = "Disable ALL testing on ALL vmbus " | |
335 | "devices.") | |
336 | parser_dis_single = subparsers.add_parser("disable_single", | |
337 | aliases = ['d'], | |
338 | parents = [path_parser], prog = "vmbus_testing", | |
339 | usage = "%(prog)s [disable_single | d] -h\n" | |
340 | "%(prog)s [disable_single | d] -p\n", | |
341 | help = "Disable ALL testing on a SINGLE vmbus device.", | |
342 | description = "Disable ALL testing on a SINGLE vmbus " | |
343 | "device.") | |
344 | parser_view_all = subparsers.add_parser("view_all", aliases = ['V'], | |
345 | help = "View the test state for ALL vmbus devices.", | |
346 | prog = "vmbus_testing", | |
347 | usage = "%(prog)s [view_all | V] -h\n" | |
348 | "%(prog)s [view_all | V]\n", | |
349 | description = "This shows the test state for ALL the " | |
350 | "vmbus devices.") | |
351 | parser_view_single = subparsers.add_parser("view_single", | |
352 | aliases = ['v'],parents = [path_parser], | |
353 | help = "View the test values for a SINGLE vmbus " | |
354 | "device.", | |
355 | description = "This shows the test values for a SINGLE " | |
356 | "vmbus device.", prog = "vmbus_testing", | |
357 | usage = "%(prog)s [view_single | v] -h\n" | |
358 | "%(prog)s [view_single | v] -p") | |
359 | ||
360 | return parser.parse_args() | |
361 | ||
362 | # value checking for range checking input in parser | |
363 | def check_range(arg1): | |
364 | ||
365 | try: | |
366 | val = int(arg1) | |
367 | except ValueError as err: | |
368 | raise argparse.ArgumentTypeError(str(err)) | |
369 | if val < -1 or val > 1000: | |
370 | message = ("\n\nvalue must be -1 or 0 < value <= 1000. " | |
371 | "Value program received: {}\n").format(val) | |
372 | raise argparse.ArgumentTypeError(message) | |
373 | return val | |
374 | ||
375 | if __name__ == "__main__": | |
376 | main() |