regression: parallelise run_tests.py. New options: -j, --legacy-status.

This commit is contained in:
Japheth Lim 2016-01-07 17:22:00 +11:00
parent da5f50aa94
commit 7d24c4a5b5
1 changed files with 86 additions and 38 deletions

View File

@ -23,10 +23,12 @@ import datetime
import fnmatch
import memusage
import os
import Queue
import signal
import subprocess
import sys
import testspec
import threading
import traceback
# Try importing psutil.
@ -108,7 +110,7 @@ def kill_family(parent_pid):
# Log only contains the output if verbose is *false*; otherwise, the
# log is output to stdout where we can't easily get to it.
#
def run_test(test, verbose=False):
def run_test(test, status_queue, verbose=False):
# Construct the base command.
command = ["bash", "-c", test.command]
@ -150,39 +152,45 @@ def run_test(test, verbose=False):
atexit.register(lambda: kill_family(process.pid))
# Setup an alarm at the timeout.
was_timeout = [False]
def alarm_handler(sig, _):
was_timeout[0] = True
was_timeout = False
def do_timeout():
was_timeout = True
kill_family(process.pid)
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(test.timeout)
timer = threading.Timer(test.timeout, do_timeout)
# Wait for the command to finish.
with memusage.process_poller(process.pid) as m:
(output, _) = process.communicate()
peak_mem_usage = m.peak_mem_usage()
# Cancel the alarm. Small race here (if the alarm fires just after the
# Cancel the alarm. Small race here (if the timer fires just after the
# process finished), but the returncode of our process should still be 0,
# and hence we won't interpret the result as a timeout.
signal.alarm(0)
if not was_timeout:
timer.cancel()
if output == None:
output = ""
if process.returncode == 0:
status = "pass"
elif was_timeout[0]:
elif was_timeout:
status = "TIMEOUT"
else:
status = "FAILED"
return (process.returncode == 0, status, output, datetime.datetime.now() - start_time, peak_mem_usage)
status_queue.put({'name': test.name,
'status': status,
'output': output,
'real_time': datetime.datetime.now() - start_time,
'mem_usage': peak_mem_usage})
# Print a status line.
def print_test_line_start(test_name):
print(" running %-25s " % (test_name + " ..."), end="")
def print_test_line_start(test_name, legacy=False):
if legacy:
return
print(" Running %-25s " % (test_name + " ..."))
sys.stdout.flush()
def print_test_line_end(test_name, color, status, time_taken, mem):
def print_test_line(test_name, color, status, time_taken, mem, legacy=False):
if mem:
# Report memory usage in gigabytes.
mem = '%5.2fGB' % round(float(mem) / 1024 / 1024 / 1024, 2)
@ -193,14 +201,15 @@ def print_test_line_end(test_name, color, status, time_taken, mem):
extras = ', '.join(filter(None, [time_taken, mem]))
# Print status line.
print(output_color(color, "%-10s" % status) + ('(%s)' % extras if extras else ''))
if legacy:
front = ' running %-25s ' % (test_name + " ...")
else:
front = ' Finished %-25s ' % test_name
print(front +
output_color(color, "%-10s" % status) +
('(%s)' % extras if extras else ''))
sys.stdout.flush()
def print_test_line(test_name, color, status, time_taken, mem):
print_test_line_start(test_name)
print_test_line_end(test_name, color, status, time_taken, mem)
#
# Recursive glob
#
@ -216,7 +225,7 @@ def rglob(base_dir, pattern):
#
def main():
# Parse arguments
parser = argparse.ArgumentParser(description="Simple Regression Framework")
parser = argparse.ArgumentParser(description="Parallel Regression Framework")
parser.add_argument("-s", "--strict", action="store_true",
help="be strict when parsing test XML files")
parser.add_argument("-d", "--directory", action="store",
@ -224,10 +233,14 @@ def main():
default=os.getcwd())
parser.add_argument("--brief", action="store_true",
help="don't print failure logs at end of test run")
parser.add_argument("-j", "--jobs", type=int, default=1,
help="Number of tests to run in parallel")
parser.add_argument("-l", "--list", action="store_true",
help="list known tests")
parser.add_argument("--legacy", action="store_true",
help="use legacy 'IsaMakefile' specs")
parser.add_argument("--legacy-status", action="store_true",
help="emulate legacy (sequential code) status lines")
parser.add_argument("-v", "--verbose", action="store_true",
help="print test output")
parser.add_argument("tests", metavar="TESTS",
@ -235,6 +248,9 @@ def main():
nargs="*")
args = parser.parse_args()
if args.jobs < 1:
parser.error("Number of parallel jobs must be at least 1")
# Search for test files:
if not args.legacy:
test_xml = sorted(rglob(args.directory, "tests.xml"))
@ -272,26 +288,60 @@ def main():
"\n")
# Run the tests.
print("Running %d test(s)...\n" % len(tests_to_run))
print("Running %d test(s)..." % len(tests_to_run))
failed_tests = set()
passed_tests = set()
failed_test_log = []
for t in tests_to_run:
if len(t.depends & failed_tests) > 0:
print_test_line(t.name, ANSI_YELLOW, "skipped", None, None)
failed_tests.add(t.name)
continue
# Run the test.
print_test_line_start(t.name)
(passed, status, log, time_taken, mem) = run_test(t, verbose=args.verbose)
# Use a simple list to store the pending queue. We track the dependencies separately.
tests_queue = [t for t in tests_to_run]
# Current jobs.
current_jobs = {}
# Output status.
status_queue = Queue.Queue()
# Print result.
if not passed:
failed_tests.add(t.name)
failed_test_log.append((t.name, log, time_taken))
print_test_line_end(t.name, ANSI_RED, "%s *" % status, time_taken, mem)
else:
print_test_line_end(t.name, ANSI_GREEN, status, time_taken, mem)
while tests_queue or current_jobs:
popped_test = False
# Check if we have a job slot.
if len(current_jobs) < args.jobs:
# Find the first non-blocked test and handle it.
for i, t in enumerate(tests_queue):
# Non-blocked and open. Start it.
if t.depends.issubset(passed_tests):
test_thread = threading.Thread(target=run_test, name=t.name,
args=(t, status_queue, args.verbose))
print_test_line_start(t.name, args.legacy_status)
test_thread.start()
current_jobs[t.name] = test_thread
popped_test = True
del tests_queue[i]
break
# Non-blocked but depends on a failed test. Remove it.
if len(t.depends & failed_tests) > 0:
print_test_line(t.name, ANSI_YELLOW, "skipped", None, None, args.legacy_status)
failed_tests.add(t.name)
del tests_queue[i]
break
# Wait for jobs to complete.
try:
while True:
info = status_queue.get(block=True, timeout=0.1337) # Built-in pause
name = info['name']
del current_jobs[name]
status, log, time_taken, mem = info['status'], info['output'], info['real_time'], info['mem_usage']
# Print result.
if status != 'pass':
failed_tests.add(name)
failed_test_log.append((name, log, time_taken))
print_test_line(name, ANSI_RED, "%s *" % status, time_taken, mem, args.legacy_status)
else:
passed_tests.add(name)
print_test_line(name, ANSI_GREEN, status, time_taken, mem, args.legacy_status)
except Queue.Empty:
# Nothing to do
pass
# Print failure summaries unless requested not to.
if not args.brief and len(failed_test_log) > 0:
@ -324,5 +374,3 @@ def main():
if __name__ == "__main__":
main()