Commit | Line | Data |
---|---|---|
76b903ee | 1 | #!/usr/bin/env python3 |
b2441318 | 2 | # SPDX-License-Identifier: GPL-2.0 |
76b903ee LB |
3 | |
4 | """ | |
5 | tdc.py - Linux tc (Traffic Control) unit test driver | |
6 | ||
7 | Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> | |
8 | """ | |
9 | ||
10 | import re | |
11 | import os | |
12 | import sys | |
13 | import argparse | |
14 | import json | |
15 | import subprocess | |
16 | from collections import OrderedDict | |
17 | from string import Template | |
18 | ||
19 | from tdc_config import * | |
20 | from tdc_helper import * | |
21 | ||
22 | ||
23 | USE_NS = True | |
24 | ||
25 | ||
26 | def replace_keywords(cmd): | |
27 | """ | |
28 | For a given executable command, substitute any known | |
29 | variables contained within NAMES with the correct values | |
30 | """ | |
31 | tcmd = Template(cmd) | |
32 | subcmd = tcmd.safe_substitute(NAMES) | |
33 | return subcmd | |
34 | ||
35 | ||
36 | def exec_cmd(command, nsonly=True): | |
37 | """ | |
38 | Perform any required modifications on an executable command, then run | |
39 | it in a subprocess and return the results. | |
40 | """ | |
41 | if (USE_NS and nsonly): | |
42 | command = 'ip netns exec $NS ' + command | |
43 | ||
44 | if '$' in command: | |
45 | command = replace_keywords(command) | |
46 | ||
47 | proc = subprocess.Popen(command, | |
48 | shell=True, | |
49 | stdout=subprocess.PIPE, | |
50 | stderr=subprocess.PIPE) | |
51 | (rawout, serr) = proc.communicate() | |
52 | ||
53 | if proc.returncode != 0: | |
54 | foutput = serr.decode("utf-8") | |
55 | else: | |
56 | foutput = rawout.decode("utf-8") | |
57 | ||
58 | proc.stdout.close() | |
59 | proc.stderr.close() | |
60 | return proc, foutput | |
61 | ||
62 | ||
63 | def prepare_env(cmdlist): | |
64 | """ | |
65 | Execute the setup/teardown commands for a test case. Optionally | |
66 | terminate test execution if the command fails. | |
67 | """ | |
68 | for cmdinfo in cmdlist: | |
69 | if (type(cmdinfo) == list): | |
70 | exit_codes = cmdinfo[1:] | |
71 | cmd = cmdinfo[0] | |
72 | else: | |
73 | exit_codes = [0] | |
74 | cmd = cmdinfo | |
75 | ||
76 | if (len(cmd) == 0): | |
77 | continue | |
78 | ||
79 | (proc, foutput) = exec_cmd(cmd) | |
80 | ||
81 | if proc.returncode not in exit_codes: | |
82 | ||
83 | print("Could not execute:") | |
84 | print(cmd) | |
85 | print("\nError message:") | |
86 | print(foutput) | |
87 | print("\nAborting test run.") | |
88 | ns_destroy() | |
89 | exit(1) | |
90 | ||
91 | ||
92 | def test_runner(filtered_tests): | |
93 | """ | |
94 | Driver function for the unit tests. | |
95 | ||
96 | Prints information about the tests being run, executes the setup and | |
97 | teardown commands and the command under test itself. Also determines | |
98 | success/failure based on the information in the test case and generates | |
99 | TAP output accordingly. | |
100 | """ | |
101 | testlist = filtered_tests | |
102 | tcount = len(testlist) | |
103 | index = 1 | |
104 | tap = str(index) + ".." + str(tcount) + "\n" | |
105 | ||
106 | for tidx in testlist: | |
107 | result = True | |
108 | tresult = "" | |
109 | print("Test " + tidx["id"] + ": " + tidx["name"]) | |
110 | prepare_env(tidx["setup"]) | |
111 | (p, procout) = exec_cmd(tidx["cmdUnderTest"]) | |
112 | exit_code = p.returncode | |
113 | ||
114 | if (exit_code != int(tidx["expExitCode"])): | |
115 | result = False | |
116 | print("exit:", exit_code, int(tidx["expExitCode"])) | |
117 | print(procout) | |
118 | else: | |
119 | match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL) | |
120 | (p, procout) = exec_cmd(tidx["verifyCmd"]) | |
121 | match_index = re.findall(match_pattern, procout) | |
122 | if len(match_index) != int(tidx["matchCount"]): | |
123 | result = False | |
124 | ||
125 | if result == True: | |
126 | tresult += "ok " | |
127 | else: | |
128 | tresult += "not ok " | |
129 | tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n" | |
130 | ||
131 | if result == False: | |
132 | tap += procout | |
133 | ||
134 | prepare_env(tidx["teardown"]) | |
135 | index += 1 | |
136 | ||
137 | return tap | |
138 | ||
139 | ||
140 | def ns_create(): | |
141 | """ | |
142 | Create the network namespace in which the tests will be run and set up | |
143 | the required network devices for it. | |
144 | """ | |
145 | if (USE_NS): | |
146 | cmd = 'ip netns add $NS' | |
147 | exec_cmd(cmd, False) | |
148 | cmd = 'ip link add $DEV0 type veth peer name $DEV1' | |
149 | exec_cmd(cmd, False) | |
150 | cmd = 'ip link set $DEV1 netns $NS' | |
151 | exec_cmd(cmd, False) | |
152 | cmd = 'ip link set $DEV0 up' | |
153 | exec_cmd(cmd, False) | |
154 | cmd = 'ip -s $NS link set $DEV1 up' | |
155 | exec_cmd(cmd, False) | |
156 | ||
157 | ||
158 | def ns_destroy(): | |
159 | """ | |
160 | Destroy the network namespace for testing (and any associated network | |
161 | devices as well) | |
162 | """ | |
163 | if (USE_NS): | |
164 | cmd = 'ip netns delete $NS' | |
165 | exec_cmd(cmd, False) | |
166 | ||
167 | ||
168 | def has_blank_ids(idlist): | |
169 | """ | |
170 | Search the list for empty ID fields and return true/false accordingly. | |
171 | """ | |
172 | return not(all(k for k in idlist)) | |
173 | ||
174 | ||
175 | def load_from_file(filename): | |
176 | """ | |
177 | Open the JSON file containing the test cases and return them as an | |
178 | ordered dictionary object. | |
179 | """ | |
180 | with open(filename) as test_data: | |
181 | testlist = json.load(test_data, object_pairs_hook=OrderedDict) | |
182 | idlist = get_id_list(testlist) | |
183 | if (has_blank_ids(idlist)): | |
184 | for k in testlist: | |
185 | k['filename'] = filename | |
186 | return testlist | |
187 | ||
188 | ||
189 | def args_parse(): | |
190 | """ | |
191 | Create the argument parser. | |
192 | """ | |
193 | parser = argparse.ArgumentParser(description='Linux TC unit tests') | |
194 | return parser | |
195 | ||
196 | ||
197 | def set_args(parser): | |
198 | """ | |
199 | Set the command line arguments for tdc. | |
200 | """ | |
201 | parser.add_argument('-p', '--path', type=str, | |
202 | help='The full path to the tc executable to use') | |
203 | parser.add_argument('-c', '--category', type=str, nargs='?', const='+c', | |
204 | help='Run tests only from the specified category, or if no category is specified, list known categories.') | |
205 | parser.add_argument('-f', '--file', type=str, | |
206 | help='Run tests from the specified file') | |
207 | parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY', | |
208 | help='List all test cases, or those only within the specified category') | |
209 | parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID', | |
210 | help='Display the test case with specified id') | |
211 | parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID', | |
212 | help='Execute the single test case with specified ID') | |
213 | parser.add_argument('-i', '--id', action='store_true', dest='gen_id', | |
214 | help='Generate ID numbers for new test cases') | |
215 | return parser | |
216 | return parser | |
217 | ||
218 | ||
219 | def check_default_settings(args): | |
220 | """ | |
221 | Process any arguments overriding the default settings, and ensure the | |
222 | settings are correct. | |
223 | """ | |
224 | # Allow for overriding specific settings | |
225 | global NAMES | |
226 | ||
227 | if args.path != None: | |
228 | NAMES['TC'] = args.path | |
229 | if not os.path.isfile(NAMES['TC']): | |
230 | print("The specified tc path " + NAMES['TC'] + " does not exist.") | |
231 | exit(1) | |
232 | ||
233 | ||
234 | def get_id_list(alltests): | |
235 | """ | |
236 | Generate a list of all IDs in the test cases. | |
237 | """ | |
238 | return [x["id"] for x in alltests] | |
239 | ||
240 | ||
241 | def check_case_id(alltests): | |
242 | """ | |
243 | Check for duplicate test case IDs. | |
244 | """ | |
245 | idl = get_id_list(alltests) | |
246 | return [x for x in idl if idl.count(x) > 1] | |
247 | ||
248 | ||
249 | def does_id_exist(alltests, newid): | |
250 | """ | |
251 | Check if a given ID already exists in the list of test cases. | |
252 | """ | |
253 | idl = get_id_list(alltests) | |
254 | return (any(newid == x for x in idl)) | |
255 | ||
256 | ||
257 | def generate_case_ids(alltests): | |
258 | """ | |
259 | If a test case has a blank ID field, generate a random hex ID for it | |
260 | and then write the test cases back to disk. | |
261 | """ | |
262 | import random | |
263 | for c in alltests: | |
264 | if (c["id"] == ""): | |
265 | while True: | |
266 | newid = str('%04x' % random.randrange(16**4)) | |
267 | if (does_id_exist(alltests, newid)): | |
268 | continue | |
269 | else: | |
270 | c['id'] = newid | |
271 | break | |
272 | ||
273 | ufilename = [] | |
274 | for c in alltests: | |
275 | if ('filename' in c): | |
276 | ufilename.append(c['filename']) | |
277 | ufilename = get_unique_item(ufilename) | |
278 | for f in ufilename: | |
279 | testlist = [] | |
280 | for t in alltests: | |
281 | if 'filename' in t: | |
282 | if t['filename'] == f: | |
283 | del t['filename'] | |
284 | testlist.append(t) | |
285 | outfile = open(f, "w") | |
286 | json.dump(testlist, outfile, indent=4) | |
287 | outfile.close() | |
288 | ||
289 | ||
290 | def get_test_cases(args): | |
291 | """ | |
292 | If a test case file is specified, retrieve tests from that file. | |
293 | Otherwise, glob for all json files in subdirectories and load from | |
294 | each one. | |
295 | """ | |
296 | import fnmatch | |
297 | if args.file != None: | |
298 | if not os.path.isfile(args.file): | |
299 | print("The specified test case file " + args.file + " does not exist.") | |
300 | exit(1) | |
301 | flist = [args.file] | |
302 | else: | |
303 | flist = [] | |
304 | for root, dirnames, filenames in os.walk('tc-tests'): | |
305 | for filename in fnmatch.filter(filenames, '*.json'): | |
306 | flist.append(os.path.join(root, filename)) | |
307 | alltests = list() | |
308 | for casefile in flist: | |
309 | alltests = alltests + (load_from_file(casefile)) | |
310 | return alltests | |
311 | ||
312 | ||
313 | def set_operation_mode(args): | |
314 | """ | |
315 | Load the test case data and process remaining arguments to determine | |
316 | what the script should do for this run, and call the appropriate | |
317 | function. | |
318 | """ | |
319 | alltests = get_test_cases(args) | |
320 | ||
321 | if args.gen_id: | |
322 | idlist = get_id_list(alltests) | |
323 | if (has_blank_ids(idlist)): | |
324 | alltests = generate_case_ids(alltests) | |
325 | else: | |
326 | print("No empty ID fields found in test files.") | |
327 | exit(0) | |
328 | ||
329 | duplicate_ids = check_case_id(alltests) | |
330 | if (len(duplicate_ids) > 0): | |
331 | print("The following test case IDs are not unique:") | |
332 | print(str(set(duplicate_ids))) | |
333 | print("Please correct them before continuing.") | |
334 | exit(1) | |
335 | ||
336 | ucat = get_test_categories(alltests) | |
337 | ||
338 | if args.showID: | |
339 | show_test_case_by_id(alltests, args.showID[0]) | |
340 | exit(0) | |
341 | ||
342 | if args.execute: | |
343 | target_id = args.execute[0] | |
344 | else: | |
345 | target_id = "" | |
346 | ||
347 | if args.category: | |
348 | if (args.category == '+c'): | |
349 | print("Available categories:") | |
350 | print_sll(ucat) | |
351 | exit(0) | |
352 | else: | |
353 | target_category = args.category | |
354 | else: | |
355 | target_category = "" | |
356 | ||
357 | ||
358 | testcases = get_categorized_testlist(alltests, ucat) | |
359 | ||
360 | if args.list: | |
361 | if (len(args.list) == 0): | |
362 | list_test_cases(alltests) | |
363 | exit(0) | |
364 | elif(len(args.list > 0)): | |
365 | if (args.list not in ucat): | |
366 | print("Unknown category " + args.list) | |
367 | print("Available categories:") | |
368 | print_sll(ucat) | |
369 | exit(1) | |
370 | list_test_cases(testcases[args.list]) | |
371 | exit(0) | |
372 | ||
373 | if (os.geteuid() != 0): | |
374 | print("This script must be run with root privileges.\n") | |
375 | exit(1) | |
376 | ||
377 | ns_create() | |
378 | ||
379 | if (len(target_category) == 0): | |
380 | if (len(target_id) > 0): | |
381 | alltests = list(filter(lambda x: target_id in x['id'], alltests)) | |
382 | if (len(alltests) == 0): | |
383 | print("Cannot find a test case with ID matching " + target_id) | |
384 | exit(1) | |
385 | catresults = test_runner(alltests) | |
386 | print("All test results: " + "\n\n" + catresults) | |
387 | elif (len(target_category) > 0): | |
388 | if (target_category not in ucat): | |
389 | print("Specified category is not present in this file.") | |
390 | exit(1) | |
391 | else: | |
392 | catresults = test_runner(testcases[target_category]) | |
393 | print("Category " + target_category + "\n\n" + catresults) | |
394 | ||
395 | ns_destroy() | |
396 | ||
397 | ||
398 | def main(): | |
399 | """ | |
400 | Start of execution; set up argument parser and get the arguments, | |
401 | and start operations. | |
402 | """ | |
403 | parser = args_parse() | |
404 | parser = set_args(parser) | |
405 | (args, remaining) = parser.parse_known_args() | |
406 | check_default_settings(args) | |
407 | ||
408 | set_operation_mode(args) | |
409 | ||
410 | exit(0) | |
411 | ||
412 | ||
413 | if __name__ == "__main__": | |
414 | main() |