summaryrefslogtreecommitdiffstats
path: root/lib/psutil/tests/runner.py
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/psutil/tests/runner.py
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'lib/psutil/tests/runner.py')
-rw-r--r--lib/psutil/tests/runner.py350
1 files changed, 350 insertions, 0 deletions
diff --git a/lib/psutil/tests/runner.py b/lib/psutil/tests/runner.py
new file mode 100644
index 0000000..2e6f83e
--- /dev/null
+++ b/lib/psutil/tests/runner.py
@@ -0,0 +1,350 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Unit test runner, providing new features on top of unittest module:
+- colourized output
+- parallel run (UNIX only)
+- print failures/tracebacks on CTRL+C
+- re-run failed tests only (make test-failed)
+
+Invocation examples:
+- make test
+- make test-failed
+
+Parallel:
+- make test-parallel
+- make test-process ARGS=--parallel
+"""
+
+from __future__ import print_function
+
+import atexit
+import optparse
+import os
+import sys
+import textwrap
+import time
+import unittest
+
+
+try:
+ import ctypes
+except ImportError:
+ ctypes = None
+
+try:
+ import concurrencytest # pip install concurrencytest
+except ImportError:
+ concurrencytest = None
+
+import psutil
+from psutil._common import hilite
+from psutil._common import print_color
+from psutil._common import term_supports_colors
+from psutil._compat import super
+from psutil.tests import CI_TESTING
+from psutil.tests import import_module_by_path
+from psutil.tests import print_sysinfo
+from psutil.tests import reap_children
+from psutil.tests import safe_rmpath
+
+
+VERBOSITY = 2
+FAILED_TESTS_FNAME = '.failed-tests.txt'
+NWORKERS = psutil.cpu_count() or 1
+USE_COLORS = not CI_TESTING and term_supports_colors()
+
+HERE = os.path.abspath(os.path.dirname(__file__))
+loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
+
+
+def cprint(msg, color, bold=False, file=None):
+ if file is None:
+ file = sys.stderr if color == 'red' else sys.stdout
+ if USE_COLORS:
+ print_color(msg, color, bold=bold, file=file)
+ else:
+ print(msg, file=file)
+
+
+class TestLoader:
+
+ testdir = HERE
+ skip_files = ['test_memleaks.py']
+ if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ:
+ skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py'])
+
+ def _get_testmods(self):
+ return [os.path.join(self.testdir, x)
+ for x in os.listdir(self.testdir)
+ if x.startswith('test_') and x.endswith('.py') and
+ x not in self.skip_files]
+
+ def _iter_testmod_classes(self):
+ """Iterate over all test files in this directory and return
+ all TestCase classes in them.
+ """
+ for path in self._get_testmods():
+ mod = import_module_by_path(path)
+ for name in dir(mod):
+ obj = getattr(mod, name)
+ if isinstance(obj, type) and \
+ issubclass(obj, unittest.TestCase):
+ yield obj
+
+ def all(self):
+ suite = unittest.TestSuite()
+ for obj in self._iter_testmod_classes():
+ test = loadTestsFromTestCase(obj)
+ suite.addTest(test)
+ return suite
+
+ def last_failed(self):
+ # ...from previously failed test run
+ suite = unittest.TestSuite()
+ if not os.path.isfile(FAILED_TESTS_FNAME):
+ return suite
+ with open(FAILED_TESTS_FNAME, 'rt') as f:
+ names = f.read().split()
+ for n in names:
+ test = unittest.defaultTestLoader.loadTestsFromName(n)
+ suite.addTest(test)
+ return suite
+
+ def from_name(self, name):
+ if name.endswith('.py'):
+ name = os.path.splitext(os.path.basename(name))[0]
+ return unittest.defaultTestLoader.loadTestsFromName(name)
+
+
+class ColouredResult(unittest.TextTestResult):
+
+ def addSuccess(self, test):
+ unittest.TestResult.addSuccess(self, test)
+ cprint("OK", "green")
+
+ def addError(self, test, err):
+ unittest.TestResult.addError(self, test, err)
+ cprint("ERROR", "red", bold=True)
+
+ def addFailure(self, test, err):
+ unittest.TestResult.addFailure(self, test, err)
+ cprint("FAIL", "red")
+
+ def addSkip(self, test, reason):
+ unittest.TestResult.addSkip(self, test, reason)
+ cprint("skipped: %s" % reason.strip(), "brown")
+
+ def printErrorList(self, flavour, errors):
+ flavour = hilite(flavour, "red", bold=flavour == 'ERROR')
+ super().printErrorList(flavour, errors)
+
+
+class ColouredTextRunner(unittest.TextTestRunner):
+ """
+ A coloured text runner which also prints failed tests on KeyboardInterrupt
+ and save failed tests in a file so that they can be re-run.
+ """
+ resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.failed_tnames = set()
+
+ def _makeResult(self):
+ # Store result instance so that it can be accessed on
+ # KeyboardInterrupt.
+ self.result = super()._makeResult()
+ return self.result
+
+ def _write_last_failed(self):
+ if self.failed_tnames:
+ with open(FAILED_TESTS_FNAME, 'wt') as f:
+ for tname in self.failed_tnames:
+ f.write(tname + '\n')
+
+ def _save_result(self, result):
+ if not result.wasSuccessful():
+ for t in result.errors + result.failures:
+ tname = t[0].id()
+ self.failed_tnames.add(tname)
+
+ def _run(self, suite):
+ try:
+ result = super().run(suite)
+ except (KeyboardInterrupt, SystemExit):
+ result = self.runner.result
+ result.printErrors()
+ raise sys.exit(1)
+ else:
+ self._save_result(result)
+ return result
+
+ def _exit(self, success):
+ if success:
+ cprint("SUCCESS", "green", bold=True)
+ safe_rmpath(FAILED_TESTS_FNAME)
+ sys.exit(0)
+ else:
+ cprint("FAILED", "red", bold=True)
+ self._write_last_failed()
+ sys.exit(1)
+
+ def run(self, suite):
+ result = self._run(suite)
+ self._exit(result.wasSuccessful())
+
+
+class ParallelRunner(ColouredTextRunner):
+
+ @staticmethod
+ def _parallelize(suite):
+ def fdopen(fd, mode, *kwds):
+ stream = orig_fdopen(fd, mode)
+ atexit.register(stream.close)
+ return stream
+
+ # Monkey patch concurrencytest lib bug (fdopen() stream not closed).
+ # https://github.com/cgoldberg/concurrencytest/issues/11
+ orig_fdopen = os.fdopen
+ concurrencytest.os.fdopen = fdopen
+ forker = concurrencytest.fork_for_tests(NWORKERS)
+ return concurrencytest.ConcurrentTestSuite(suite, forker)
+
+ @staticmethod
+ def _split_suite(suite):
+ serial = unittest.TestSuite()
+ parallel = unittest.TestSuite()
+ for test in suite:
+ if test.countTestCases() == 0:
+ continue
+ elif isinstance(test, unittest.TestSuite):
+ test_class = test._tests[0].__class__
+ elif isinstance(test, unittest.TestCase):
+ test_class = test
+ else:
+ raise TypeError("can't recognize type %r" % test)
+
+ if getattr(test_class, '_serialrun', False):
+ serial.addTest(test)
+ else:
+ parallel.addTest(test)
+ return (serial, parallel)
+
+ def run(self, suite):
+ ser_suite, par_suite = self._split_suite(suite)
+ par_suite = self._parallelize(par_suite)
+
+ # run parallel
+ cprint("starting parallel tests using %s workers" % NWORKERS,
+ "green", bold=True)
+ t = time.time()
+ par = self._run(par_suite)
+ par_elapsed = time.time() - t
+
+ # At this point we should have N zombies (the workers), which
+ # will disappear with wait().
+ orphans = psutil.Process().children()
+ gone, alive = psutil.wait_procs(orphans, timeout=1)
+ if alive:
+ cprint("alive processes %s" % alive, "red")
+ reap_children()
+
+ # run serial
+ t = time.time()
+ ser = self._run(ser_suite)
+ ser_elapsed = time.time() - t
+
+ # print
+ if not par.wasSuccessful() and ser_suite.countTestCases() > 0:
+ par.printErrors() # print them again at the bottom
+ par_fails, par_errs, par_skips = map(len, (par.failures,
+ par.errors,
+ par.skipped))
+ ser_fails, ser_errs, ser_skips = map(len, (ser.failures,
+ ser.errors,
+ ser.skipped))
+ print(textwrap.dedent("""
+ +----------+----------+----------+----------+----------+----------+
+ | | total | failures | errors | skipped | time |
+ +----------+----------+----------+----------+----------+----------+
+ | parallel | %3s | %3s | %3s | %3s | %.2fs |
+ +----------+----------+----------+----------+----------+----------+
+ | serial | %3s | %3s | %3s | %3s | %.2fs |
+ +----------+----------+----------+----------+----------+----------+
+ """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed,
+ ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed)))
+ print("Ran %s tests in %.3fs using %s workers" % (
+ par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS))
+ ok = par.wasSuccessful() and ser.wasSuccessful()
+ self._exit(ok)
+
+
+def get_runner(parallel=False):
+ def warn(msg):
+ cprint(msg + " Running serial tests instead.", "red")
+ if parallel:
+ if psutil.WINDOWS:
+ warn("Can't run parallel tests on Windows.")
+ elif concurrencytest is None:
+ warn("concurrencytest module is not installed.")
+ elif NWORKERS == 1:
+ warn("Only 1 CPU available.")
+ else:
+ return ParallelRunner(verbosity=VERBOSITY)
+ return ColouredTextRunner(verbosity=VERBOSITY)
+
+
+# Used by test_*,py modules.
+def run_from_name(name):
+ if CI_TESTING:
+ print_sysinfo()
+ suite = TestLoader().from_name(name)
+ runner = get_runner()
+ runner.run(suite)
+
+
+def setup():
+ psutil._set_debug(True)
+
+
+def main():
+ setup()
+ usage = "python3 -m psutil.tests [opts] [test-name]"
+ parser = optparse.OptionParser(usage=usage, description="run unit tests")
+ parser.add_option("--last-failed",
+ action="store_true", default=False,
+ help="only run last failed tests")
+ parser.add_option("--parallel",
+ action="store_true", default=False,
+ help="run tests in parallel")
+ opts, args = parser.parse_args()
+
+ if not opts.last_failed:
+ safe_rmpath(FAILED_TESTS_FNAME)
+
+ # loader
+ loader = TestLoader()
+ if args:
+ if len(args) > 1:
+ parser.print_usage()
+ return sys.exit(1)
+ else:
+ suite = loader.from_name(args[0])
+ elif opts.last_failed:
+ suite = loader.last_failed()
+ else:
+ suite = loader.all()
+
+ if CI_TESTING:
+ print_sysinfo()
+ runner = get_runner(opts.parallel)
+ runner.run(suite)
+
+
+if __name__ == '__main__':
+ main()