regression: add --fail-fast option; refactor stringly typed code.

This commit is contained in:
Japheth Lim 2016-02-03 17:44:12 +11:00
parent e409f357e9
commit 404b779e21
1 changed files with 133 additions and 53 deletions

View File

@ -35,6 +35,7 @@ import testspec
import threading
import time
import traceback
import warnings
import xml.etree.ElementTree as ET
try:
@ -104,6 +105,31 @@ def kill_family(parent_pid):
for p in process_list:
p.send_signal(signal.SIGKILL)
# Process statuses
(RUNNING, # Still running
PASSED, # Passed
FAILED, # Failed
SKIPPED, # Failed dependencies
ERROR, # Failed to run test at all
TIMEOUT, # Wall timeout
CPU_TIMEOUT, # CPU timeout
STUCK, # No CPU activity detected
CANCELLED # Cancelled for external reasons
) = range(9)
# for print_test_line
status_name = ['RUNNING (***bug***)',
'passed',
'FAILED',
'SKIPPED',
'ERROR',
'TIMEOUT',
'TIMEOUT',
'STUCK',
'CANCELLED']
status_maxlen = max(len(s) for s in status_name[1:]) + len(" *")
#
# Run a single test.
#
@ -113,7 +139,8 @@ def kill_family(parent_pid):
# Log only contains the output if verbose is *false*; otherwise, the
# log is output to stdout where we can't easily get to it.
#
def run_test(test, status_queue, verbose=False, stuck_timeout=None):
# kill_switch is a threading.Event that is set if the --fail-fast feature is triggered.
def run_test(test, status_queue, kill_switch, verbose=False, stuck_timeout=None):
# Construct the base command.
command = ["bash", "-c", test.command]
@ -150,25 +177,47 @@ def run_test(test, status_queue, verbose=False, stuck_timeout=None):
output = "Exception while running test:\n\n%s" % (traceback.format_exc())
if verbose:
print(output)
return (False, "ERROR", output, datetime.datetime.now() - start_time, peak_mem_usage)
status_queue.put({'name': test.name,
'status': ERROR,
'output': output,
'real_time': datetime.datetime.now() - start_time,
'cpu_time': 0,
'mem_usage': peak_mem_usage})
return
# Now running the test.
# Wrap in a list to prevent nested functions getting the wrong scope
test_status = [RUNNING]
# If we exit for some reason, attempt to kill our test processes.
process_running = True
def emergency_stop():
if process_running:
if test_status[0] is RUNNING:
kill_family(process.pid)
atexit.register(emergency_stop)
# Setup an alarm at the timeout.
was_timeout = [False] # Wrap in list to prevent do_timeout getting the wrong variable scope
was_cpu_timeout = [False] # as above
# Setup a timer for the timeout.
def do_timeout():
was_timeout[0] = True
kill_family(process.pid)
if test_status[0] is RUNNING:
test_status[0] = TIMEOUT
kill_family(process.pid)
timer = threading.Timer(test.timeout, do_timeout)
if test.timeout > 0:
timer.start()
# Poll the kill switch.
def watch_kill_switch():
while True:
if test_status[0] is not RUNNING:
break
if kill_switch.wait(1):
if test_status[0] is not RUNNING:
break
test_status[0] = CANCELLED
kill_family(process.pid)
kill_switch_thread = threading.Thread(target=watch_kill_switch)
kill_switch_thread.daemon = True
kill_switch_thread.start()
with cpuusage.process_poller(process.pid) as c:
# Inactivity timeout
low_cpu_usage = 0.05 # 5% -- FIXME: hardcoded
@ -179,7 +228,7 @@ def run_test(test, status_queue, verbose=False, stuck_timeout=None):
# Also set a CPU timeout. We poll the cpu usage periodically.
def cpu_timeout():
interval = min(0.5, test.cpu_timeout / 10.0)
while process_running:
while test_status[0] is RUNNING:
cpu_usage = c.cpu_usage()
if stuck_timeout:
@ -200,12 +249,12 @@ def run_test(test, status_queue, verbose=False, stuck_timeout=None):
if (now - cpu_history[0][0] >= stuck_timeout and
cpu_usage_total[0] / len(cpu_history) < low_cpu_usage):
was_cpu_timeout[0] = 'stuck'
test_status[0] = STUCK
kill_family(process.pid)
break
if cpu_usage > test.cpu_timeout:
was_cpu_timeout[0] = 'total'
test_status[0] = CPU_TIMEOUT
kill_family(process.pid)
break
@ -223,30 +272,28 @@ def run_test(test, status_queue, verbose=False, stuck_timeout=None):
peak_mem_usage = m.peak_mem_usage()
cpu_usage = c.cpu_usage()
process_running = False
if process.returncode == 0:
test_status[0] = PASSED
elif test_status[0] is RUNNING:
# No special status, so assume it failed by itself
test_status[0] = FAILED
if test.cpu_timeout > 0:
# prevent cpu_timer using c after it goes away
cpu_timer.join()
# Cancel the alarm. Small race here (if the timer fires just after the
# Cancel the timer. Small race here (if the timer fires just after the
# process finished), but the returncode of our process should still be 0,
# and hence we won't interpret the result as a timeout.
if not was_timeout[0]:
if test_status[0] is not TIMEOUT:
timer.cancel()
if output == None:
output = ""
output = output.decode()
if process.returncode == 0:
status = "pass"
elif was_timeout[0] or was_cpu_timeout[0]:
if was_cpu_timeout[0] == 'stuck':
status = "STUCK for %gs" % stuck_timeout
else:
status = "TIMEOUT"
else:
status = "FAILED"
status_queue.put({'name': test.name,
'status': status,
'status': test_status[0],
'output': output,
'real_time': datetime.datetime.now() - start_time,
'cpu_time': cpu_usage,
@ -281,8 +328,11 @@ def print_test_line(test_name, color, status, real_time=None, cpu_time=None, mem
front = ' running %-25s ' % (test_name + " ...")
else:
front = ' Finished %-25s ' % test_name
status_str = status_name[status]
if status is not PASSED:
status_str += " *"
print(front +
output_color(color, "%-10s" % status) +
output_color(color, "{:<{}} ".format(status_str, status_maxlen)) +
('(%s)' % extras if extras else ''))
sys.stdout.flush()
@ -309,6 +359,8 @@ def main():
default=os.getcwd())
parser.add_argument("--brief", action="store_true",
help="don't print failure logs at end of test run")
parser.add_argument("-f", "--fail-fast", action="store_true",
help="exit once the first failure is detected")
parser.add_argument("-j", "--jobs", type=int, default=1,
help="Number of tests to run in parallel")
parser.add_argument("-l", "--list", action="store_true",
@ -323,7 +375,7 @@ def main():
parser.add_argument("--junit-report", metavar="FILE",
help="write JUnit-style test report")
parser.add_argument("--stuck-timeout", type=int, default=600, metavar='N',
help="timeout tests if not using CPU for N seconds (default: %(default))")
help="timeout tests if not using CPU for N seconds (default: 600)")
parser.add_argument("tests", metavar="TESTS",
help="tests to run (defaults to all tests)",
nargs="*")
@ -367,7 +419,7 @@ def main():
tests_queue = tests_to_run[:]
# Current jobs.
current_jobs = {}
# Output status.
# Newly finished jobs.
status_queue = Queue.Queue()
# If run from a tty and -v is off, we also track
@ -380,6 +432,9 @@ def main():
sys.stdout.flush()
tty_status_line[0] = ""
# Handle --fail-fast
kill_switch = threading.Event()
while tests_queue or current_jobs:
# Update status line with pending jobs.
if current_jobs and sys.stdout.isatty() and not args.verbose:
@ -393,10 +448,21 @@ def main():
for i, t in enumerate(tests_queue):
# Leave out dependencies that were excluded at the command line.
real_depends = t.depends & set(t.name for t in tests_to_run)
# Non-blocked but depends on a failed test. Remove it.
if (len(real_depends & failed_tests) > 0
# --fail-fast triggered, fail all subsequent tests
or kill_switch.is_set()):
wipe_tty_status()
print_test_line(t.name, ANSI_YELLOW, SKIPPED, legacy=args.legacy_status)
failed_tests.add(t.name)
del tests_queue[i]
break
# Non-blocked and open. Start it.
if real_depends.issubset(passed_tests):
test_thread = threading.Thread(target=run_test, name=t.name,
args=(t, status_queue, args.verbose, args.stuck_timeout))
args=(t, status_queue, kill_switch,
args.verbose, args.stuck_timeout))
wipe_tty_status()
print_test_line_start(t.name, args.legacy_status)
test_thread.start()
@ -404,13 +470,6 @@ def main():
popped_test = True
del tests_queue[i]
break
# Non-blocked but depends on a failed test. Remove it.
if len(real_depends & failed_tests) > 0:
wipe_tty_status()
print_test_line(t.name, ANSI_YELLOW, "skipped", legacy=args.legacy_status)
failed_tests.add(t.name)
del tests_queue[i]
break
# Wait for jobs to complete.
try:
@ -423,16 +482,21 @@ def main():
# Print result.
wipe_tty_status()
if status != 'pass':
failed_tests.add(name)
status += " * "
colour = ANSI_RED
else:
if status is PASSED:
passed_tests.add(name)
colour = ANSI_GREEN
elif status is CANCELLED:
failed_tests.add(name)
colour = ANSI_YELLOW
else:
failed_tests.add(name)
colour = ANSI_RED
print_test_line(name, colour, status,
real_time=info['real_time'], cpu_time=info['cpu_time'], mem=info['mem_usage'],
legacy=args.legacy_status)
if args.fail_fast and status != PASSED:
# Notify current threads and future tests
kill_switch.set()
except Queue.Empty:
pass
wipe_tty_status()
@ -451,10 +515,13 @@ def main():
continue
print_line()
print("TEST FAILURE: %s" % t.name)
print("TEST %s: %s" % (status_name[test_results[t.name]['status']], t.name))
print("")
log = test_results[t.name]['output'].rstrip("\n") + "\n"
lines = log.split("\n")
output = test_results[t.name]['output'].rstrip("\n")
if output:
lines = output.split("\n") + ['']
else:
lines = ['(no output)']
if len(lines) > LINE_LIMIT:
lines = ["..."] + lines[-LINE_LIMIT:]
print("\n".join(lines))
@ -469,21 +536,32 @@ def main():
# test was skipped
testcase = ET.SubElement(testsuite, "testcase",
classname="", name=t.name, time="0")
ET.SubElement(testcase, "error", type="error").text = (
"Failed dependencies: " + ', '.join(t.depends & failed_tests))
if t.depends & failed_tests:
ET.SubElement(testcase, "error", type="error").text = (
"Failed dependencies: " + ', '.join(t.depends & failed_tests))
else:
ET.SubElement(testcase, "error", type="error").text = "Cancelled"
else:
info = test_results[t.name]
testcase = ET.SubElement(testsuite, "testcase",
classname="", name=t.name, time='%f' % info['real_time'].total_seconds())
if info['status'] == "FAILED":
ET.SubElement(testcase, "failure", type="failure").text = info['output']
elif info['status'] == "TIMEOUT":
ET.SubElement(testcase, "error", type="timeout").text = info['output']
elif "STUCK" in info['status']:
ET.SubElement(testcase, "error", type="stuck").text = info['output']
else:
if info['status'] is PASSED:
if not args.verbose:
ET.SubElement(testcase, "system-out").text = info['output']
elif info['status'] is FAILED:
ET.SubElement(testcase, "failure", type="failure").text = info['output']
elif info['status'] in (TIMEOUT, CPU_TIMEOUT):
ET.SubElement(testcase, "error", type="timeout").text = info['output']
elif info['status'] is STUCK:
ET.SubElement(testcase, "error", type="stuck").text = info['output']
elif info['status'] is CANCELLED:
ET.SubElement(testcase, "error", type="cancelled").text = info['output']
elif info['status'] is ERROR:
ET.SubElement(testcase, "error", type="error").text = info['output']
else:
warnings.warn("Unknown status code: {}".format(info['status']))
ET.SubElement(testcase, "error", type="unknown").text = info['output']
ET.ElementTree(testsuite).write(args.junit_report)
# Print summary.
@ -492,6 +570,8 @@ def main():
% (len(tests_to_run) - len(failed_tests), len(tests_to_run)))
if len(failed_tests) > 0:
print(output_color(ANSI_RED, "Tests failed.") + "\n")
if kill_switch.is_set():
print("Exiting early due to --fail-fast.")
sys.exit(1)
else:
print(output_color(ANSI_GREEN, "All tests passed.") + "\n")