License cleanup: add SPDX GPL-2.0 license identifier to files with no license
[linux-block.git] / tools / testing / selftests / tc-testing / tdc.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3
4 """
5 tdc.py - Linux tc (Traffic Control) unit test driver
6
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8 """
9
10 import re
11 import os
12 import sys
13 import argparse
14 import json
15 import subprocess
16 from collections import OrderedDict
17 from string import Template
18
19 from tdc_config import *
20 from tdc_helper import *
21
22
23 USE_NS = True
24
25
26 def replace_keywords(cmd):
27     """
28     For a given executable command, substitute any known
29     variables contained within NAMES with the correct values
30     """
31     tcmd = Template(cmd)
32     subcmd = tcmd.safe_substitute(NAMES)
33     return subcmd
34
35
36 def exec_cmd(command, nsonly=True):
37     """
38     Perform any required modifications on an executable command, then run
39     it in a subprocess and return the results.
40     """
41     if (USE_NS and nsonly):
42         command = 'ip netns exec $NS ' + command
43
44     if '$' in command:
45         command = replace_keywords(command)
46
47     proc = subprocess.Popen(command,
48         shell=True,
49         stdout=subprocess.PIPE,
50         stderr=subprocess.PIPE)
51     (rawout, serr) = proc.communicate()
52
53     if proc.returncode != 0:
54         foutput = serr.decode("utf-8")
55     else:
56         foutput = rawout.decode("utf-8")
57
58     proc.stdout.close()
59     proc.stderr.close()
60     return proc, foutput
61
62
63 def prepare_env(cmdlist):
64     """
65     Execute the setup/teardown commands for a test case. Optionally
66     terminate test execution if the command fails.
67     """
68     for cmdinfo in cmdlist:
69         if (type(cmdinfo) == list):
70             exit_codes = cmdinfo[1:]
71             cmd = cmdinfo[0]
72         else:
73             exit_codes = [0]
74             cmd = cmdinfo
75
76         if (len(cmd) == 0):
77             continue
78
79         (proc, foutput) = exec_cmd(cmd)
80
81         if proc.returncode not in exit_codes:
82             print
83             print("Could not execute:")
84             print(cmd)
85             print("\nError message:")
86             print(foutput)
87             print("\nAborting test run.")
88             ns_destroy()
89             exit(1)
90
91
92 def test_runner(filtered_tests):
93     """
94     Driver function for the unit tests.
95
96     Prints information about the tests being run, executes the setup and
97     teardown commands and the command under test itself. Also determines
98     success/failure based on the information in the test case and generates
99     TAP output accordingly.
100     """
101     testlist = filtered_tests
102     tcount = len(testlist)
103     index = 1
104     tap = str(index) + ".." + str(tcount) + "\n"
105
106     for tidx in testlist:
107         result = True
108         tresult = ""
109         print("Test " + tidx["id"] + ": " + tidx["name"])
110         prepare_env(tidx["setup"])
111         (p, procout) = exec_cmd(tidx["cmdUnderTest"])
112         exit_code = p.returncode
113
114         if (exit_code != int(tidx["expExitCode"])):
115             result = False
116             print("exit:", exit_code, int(tidx["expExitCode"]))
117             print(procout)
118         else:
119             match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
120             (p, procout) = exec_cmd(tidx["verifyCmd"])
121             match_index = re.findall(match_pattern, procout)
122             if len(match_index) != int(tidx["matchCount"]):
123                 result = False
124
125         if result == True:
126             tresult += "ok "
127         else:
128             tresult += "not ok "
129         tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
130
131         if result == False:
132             tap += procout
133
134         prepare_env(tidx["teardown"])
135         index += 1
136
137     return tap
138
139
140 def ns_create():
141     """
142     Create the network namespace in which the tests will be run and set up
143     the required network devices for it.
144     """
145     if (USE_NS):
146         cmd = 'ip netns add $NS'
147         exec_cmd(cmd, False)
148         cmd = 'ip link add $DEV0 type veth peer name $DEV1'
149         exec_cmd(cmd, False)
150         cmd = 'ip link set $DEV1 netns $NS'
151         exec_cmd(cmd, False)
152         cmd = 'ip link set $DEV0 up'
153         exec_cmd(cmd, False)
154         cmd = 'ip -s $NS link set $DEV1 up'
155         exec_cmd(cmd, False)
156
157
158 def ns_destroy():
159     """
160     Destroy the network namespace for testing (and any associated network
161     devices as well)
162     """
163     if (USE_NS):
164         cmd = 'ip netns delete $NS'
165         exec_cmd(cmd, False)
166
167
168 def has_blank_ids(idlist):
169     """
170     Search the list for empty ID fields and return true/false accordingly.
171     """
172     return not(all(k for k in idlist))
173
174
175 def load_from_file(filename):
176     """
177     Open the JSON file containing the test cases and return them as an
178     ordered dictionary object.
179     """
180     with open(filename) as test_data:
181         testlist = json.load(test_data, object_pairs_hook=OrderedDict)
182     idlist = get_id_list(testlist)
183     if (has_blank_ids(idlist)):
184         for k in testlist:
185             k['filename'] = filename
186     return testlist
187
188
189 def args_parse():
190     """
191     Create the argument parser.
192     """
193     parser = argparse.ArgumentParser(description='Linux TC unit tests')
194     return parser
195
196
197 def set_args(parser):
198     """
199     Set the command line arguments for tdc.
200     """
201     parser.add_argument('-p', '--path', type=str,
202                         help='The full path to the tc executable to use')
203     parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
204                         help='Run tests only from the specified category, or if no category is specified, list known categories.')
205     parser.add_argument('-f', '--file', type=str,
206                         help='Run tests from the specified file')
207     parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
208                         help='List all test cases, or those only within the specified category')
209     parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
210                         help='Display the test case with specified id')
211     parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
212                         help='Execute the single test case with specified ID')
213     parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
214                         help='Generate ID numbers for new test cases')
215     return parser
216     return parser
217
218
219 def check_default_settings(args):
220     """
221     Process any arguments overriding the default settings, and ensure the
222     settings are correct.
223     """
224     # Allow for overriding specific settings
225     global NAMES
226
227     if args.path != None:
228          NAMES['TC'] = args.path
229     if not os.path.isfile(NAMES['TC']):
230         print("The specified tc path " + NAMES['TC'] + " does not exist.")
231         exit(1)
232
233
234 def get_id_list(alltests):
235     """
236     Generate a list of all IDs in the test cases.
237     """
238     return [x["id"] for x in alltests]
239
240
241 def check_case_id(alltests):
242     """
243     Check for duplicate test case IDs.
244     """
245     idl = get_id_list(alltests)
246     return [x for x in idl if idl.count(x) > 1]
247
248
249 def does_id_exist(alltests, newid):
250     """
251     Check if a given ID already exists in the list of test cases.
252     """
253     idl = get_id_list(alltests)
254     return (any(newid == x for x in idl))
255
256
257 def generate_case_ids(alltests):
258     """
259     If a test case has a blank ID field, generate a random hex ID for it
260     and then write the test cases back to disk.
261     """
262     import random
263     for c in alltests:
264         if (c["id"] == ""):
265             while True:
266                 newid = str('%04x' % random.randrange(16**4))
267                 if (does_id_exist(alltests, newid)):
268                     continue
269                 else:
270                     c['id'] = newid
271                     break
272
273     ufilename = []
274     for c in alltests:
275         if ('filename' in c):
276             ufilename.append(c['filename'])
277     ufilename = get_unique_item(ufilename)
278     for f in ufilename:
279         testlist = []
280         for t in alltests:
281             if 'filename' in t:
282                 if t['filename'] == f:
283                     del t['filename']
284                     testlist.append(t)
285         outfile = open(f, "w")
286         json.dump(testlist, outfile, indent=4)
287         outfile.close()
288
289
290 def get_test_cases(args):
291     """
292     If a test case file is specified, retrieve tests from that file.
293     Otherwise, glob for all json files in subdirectories and load from
294     each one.
295     """
296     import fnmatch
297     if args.file != None:
298         if not os.path.isfile(args.file):
299             print("The specified test case file " + args.file + " does not exist.")
300             exit(1)
301         flist = [args.file]
302     else:
303         flist = []
304         for root, dirnames, filenames in os.walk('tc-tests'):
305             for filename in fnmatch.filter(filenames, '*.json'):
306                 flist.append(os.path.join(root, filename))
307     alltests = list()
308     for casefile in flist:
309         alltests = alltests + (load_from_file(casefile))
310     return alltests
311
312
313 def set_operation_mode(args):
314     """
315     Load the test case data and process remaining arguments to determine
316     what the script should do for this run, and call the appropriate
317     function.
318     """
319     alltests = get_test_cases(args)
320
321     if args.gen_id:
322         idlist = get_id_list(alltests)
323         if (has_blank_ids(idlist)):
324             alltests = generate_case_ids(alltests)
325         else:
326             print("No empty ID fields found in test files.")
327         exit(0)
328
329     duplicate_ids = check_case_id(alltests)
330     if (len(duplicate_ids) > 0):
331         print("The following test case IDs are not unique:")
332         print(str(set(duplicate_ids)))
333         print("Please correct them before continuing.")
334         exit(1)
335
336     ucat = get_test_categories(alltests)
337
338     if args.showID:
339         show_test_case_by_id(alltests, args.showID[0])
340         exit(0)
341
342     if args.execute:
343         target_id = args.execute[0]
344     else:
345         target_id = ""
346
347     if args.category:
348         if (args.category == '+c'):
349             print("Available categories:")
350             print_sll(ucat)
351             exit(0)
352         else:
353             target_category = args.category
354     else:
355         target_category = ""
356
357
358     testcases = get_categorized_testlist(alltests, ucat)
359
360     if args.list:
361         if (len(args.list) == 0):
362             list_test_cases(alltests)
363             exit(0)
364         elif(len(args.list > 0)):
365             if (args.list not in ucat):
366                 print("Unknown category " + args.list)
367                 print("Available categories:")
368                 print_sll(ucat)
369                 exit(1)
370             list_test_cases(testcases[args.list])
371             exit(0)
372
373     if (os.geteuid() != 0):
374         print("This script must be run with root privileges.\n")
375         exit(1)
376
377     ns_create()
378
379     if (len(target_category) == 0):
380         if (len(target_id) > 0):
381             alltests = list(filter(lambda x: target_id in x['id'], alltests))
382             if (len(alltests) == 0):
383                 print("Cannot find a test case with ID matching " + target_id)
384                 exit(1)
385         catresults = test_runner(alltests)
386         print("All test results: " + "\n\n" + catresults)
387     elif (len(target_category) > 0):
388         if (target_category not in ucat):
389             print("Specified category is not present in this file.")
390             exit(1)
391         else:
392             catresults = test_runner(testcases[target_category])
393             print("Category " + target_category + "\n\n" + catresults)
394
395     ns_destroy()
396
397
398 def main():
399     """
400     Start of execution; set up argument parser and get the arguments,
401     and start operations.
402     """
403     parser = args_parse()
404     parser = set_args(parser)
405     (args, remaining) = parser.parse_known_args()
406     check_default_settings(args)
407
408     set_operation_mode(args)
409
410     exit(0)
411
412
413 if __name__ == "__main__":
414     main()