summaryrefslogtreecommitdiffstats
path: root/lib/psutil/tests/test_misc.py
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/psutil/tests/test_misc.py
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'lib/psutil/tests/test_misc.py')
-rw-r--r--lib/psutil/tests/test_misc.py852
1 files changed, 852 insertions, 0 deletions
diff --git a/lib/psutil/tests/test_misc.py b/lib/psutil/tests/test_misc.py
new file mode 100644
index 0000000..e22789c
--- /dev/null
+++ b/lib/psutil/tests/test_misc.py
@@ -0,0 +1,852 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Miscellaneous tests.
+"""
+
+import ast
+import collections
+import errno
+import json
+import os
+import pickle
+import socket
+import stat
+import unittest
+
+import psutil
+import psutil.tests
+from psutil import LINUX
+from psutil import POSIX
+from psutil import WINDOWS
+from psutil._common import bcat
+from psutil._common import cat
+from psutil._common import debug
+from psutil._common import isfile_strict
+from psutil._common import memoize
+from psutil._common import memoize_when_activated
+from psutil._common import parse_environ_block
+from psutil._common import supports_ipv6
+from psutil._common import wrap_numbers
+from psutil._compat import PY3
+from psutil._compat import FileNotFoundError
+from psutil._compat import redirect_stderr
+from psutil.tests import APPVEYOR
+from psutil.tests import CI_TESTING
+from psutil.tests import HAS_BATTERY
+from psutil.tests import HAS_MEMORY_MAPS
+from psutil.tests import HAS_NET_IO_COUNTERS
+from psutil.tests import HAS_SENSORS_BATTERY
+from psutil.tests import HAS_SENSORS_FANS
+from psutil.tests import HAS_SENSORS_TEMPERATURES
+from psutil.tests import PYTHON_EXE
+from psutil.tests import ROOT_DIR
+from psutil.tests import SCRIPTS_DIR
+from psutil.tests import PsutilTestCase
+from psutil.tests import import_module_by_path
+from psutil.tests import mock
+from psutil.tests import reload_module
+from psutil.tests import sh
+
+
+# ===================================================================
+# --- Test classes' repr(), str(), ...
+# ===================================================================
+
+
+class TestSpecialMethods(PsutilTestCase):
+
+ def test_process__repr__(self, func=repr):
+ p = psutil.Process(self.spawn_testproc().pid)
+ r = func(p)
+ self.assertIn("psutil.Process", r)
+ self.assertIn("pid=%s" % p.pid, r)
+ self.assertIn("name='%s'" % str(p.name()),
+ r.replace("name=u'", "name='"))
+ self.assertIn("status=", r)
+ self.assertNotIn("exitcode=", r)
+ p.terminate()
+ p.wait()
+ r = func(p)
+ self.assertIn("status='terminated'", r)
+ self.assertIn("exitcode=", r)
+
+ with mock.patch.object(psutil.Process, "name",
+ side_effect=psutil.ZombieProcess(os.getpid())):
+ p = psutil.Process()
+ r = func(p)
+ self.assertIn("pid=%s" % p.pid, r)
+ self.assertIn("status='zombie'", r)
+ self.assertNotIn("name=", r)
+ with mock.patch.object(psutil.Process, "name",
+ side_effect=psutil.NoSuchProcess(os.getpid())):
+ p = psutil.Process()
+ r = func(p)
+ self.assertIn("pid=%s" % p.pid, r)
+ self.assertIn("terminated", r)
+ self.assertNotIn("name=", r)
+ with mock.patch.object(psutil.Process, "name",
+ side_effect=psutil.AccessDenied(os.getpid())):
+ p = psutil.Process()
+ r = func(p)
+ self.assertIn("pid=%s" % p.pid, r)
+ self.assertNotIn("name=", r)
+
+ def test_process__str__(self):
+ self.test_process__repr__(func=str)
+
+ def test_error__repr__(self):
+ self.assertEqual(repr(psutil.Error()), "psutil.Error()")
+
+ def test_error__str__(self):
+ self.assertEqual(str(psutil.Error()), "")
+
+ def test_no_such_process__repr__(self):
+ self.assertEqual(
+ repr(psutil.NoSuchProcess(321)),
+ "psutil.NoSuchProcess(pid=321, msg='process no longer exists')")
+ self.assertEqual(
+ repr(psutil.NoSuchProcess(321, name="name", msg="msg")),
+ "psutil.NoSuchProcess(pid=321, name='name', msg='msg')")
+
+ def test_no_such_process__str__(self):
+ self.assertEqual(
+ str(psutil.NoSuchProcess(321)),
+ "process no longer exists (pid=321)")
+ self.assertEqual(
+ str(psutil.NoSuchProcess(321, name="name", msg="msg")),
+ "msg (pid=321, name='name')")
+
+ def test_zombie_process__repr__(self):
+ self.assertEqual(
+ repr(psutil.ZombieProcess(321)),
+ 'psutil.ZombieProcess(pid=321, msg="PID still '
+ 'exists but it\'s a zombie")')
+ self.assertEqual(
+ repr(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")),
+ "psutil.ZombieProcess(pid=321, ppid=320, name='name', msg='foo')")
+
+ def test_zombie_process__str__(self):
+ self.assertEqual(
+ str(psutil.ZombieProcess(321)),
+ "PID still exists but it's a zombie (pid=321)")
+ self.assertEqual(
+ str(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")),
+ "foo (pid=321, ppid=320, name='name')")
+
+ def test_access_denied__repr__(self):
+ self.assertEqual(
+ repr(psutil.AccessDenied(321)),
+ "psutil.AccessDenied(pid=321)")
+ self.assertEqual(
+ repr(psutil.AccessDenied(321, name="name", msg="msg")),
+ "psutil.AccessDenied(pid=321, name='name', msg='msg')")
+
+ def test_access_denied__str__(self):
+ self.assertEqual(
+ str(psutil.AccessDenied(321)),
+ "(pid=321)")
+ self.assertEqual(
+ str(psutil.AccessDenied(321, name="name", msg="msg")),
+ "msg (pid=321, name='name')")
+
+ def test_timeout_expired__repr__(self):
+ self.assertEqual(
+ repr(psutil.TimeoutExpired(5)),
+ "psutil.TimeoutExpired(seconds=5, msg='timeout after 5 seconds')")
+ self.assertEqual(
+ repr(psutil.TimeoutExpired(5, pid=321, name="name")),
+ "psutil.TimeoutExpired(pid=321, name='name', seconds=5, "
+ "msg='timeout after 5 seconds')")
+
+ def test_timeout_expired__str__(self):
+ self.assertEqual(
+ str(psutil.TimeoutExpired(5)),
+ "timeout after 5 seconds")
+ self.assertEqual(
+ str(psutil.TimeoutExpired(5, pid=321, name="name")),
+ "timeout after 5 seconds (pid=321, name='name')")
+
+ def test_process__eq__(self):
+ p1 = psutil.Process()
+ p2 = psutil.Process()
+ self.assertEqual(p1, p2)
+ p2._ident = (0, 0)
+ self.assertNotEqual(p1, p2)
+ self.assertNotEqual(p1, 'foo')
+
+ def test_process__hash__(self):
+ s = set([psutil.Process(), psutil.Process()])
+ self.assertEqual(len(s), 1)
+
+
+# ===================================================================
+# --- Misc, generic, corner cases
+# ===================================================================
+
+
+class TestMisc(PsutilTestCase):
+
+ def test__all__(self):
+ dir_psutil = dir(psutil)
+ for name in dir_psutil:
+ if name in ('long', 'tests', 'test', 'PermissionError',
+ 'ProcessLookupError'):
+ continue
+ if not name.startswith('_'):
+ try:
+ __import__(name)
+ except ImportError:
+ if name not in psutil.__all__:
+ fun = getattr(psutil, name)
+ if fun is None:
+ continue
+ if (fun.__doc__ is not None and
+ 'deprecated' not in fun.__doc__.lower()):
+ raise self.fail('%r not in psutil.__all__' % name)
+
+ # Import 'star' will break if __all__ is inconsistent, see:
+ # https://github.com/giampaolo/psutil/issues/656
+ # Can't do `from psutil import *` as it won't work on python 3
+ # so we simply iterate over __all__.
+ for name in psutil.__all__:
+ self.assertIn(name, dir_psutil)
+
+ def test_version(self):
+ self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
+ psutil.__version__)
+
+ def test_process_as_dict_no_new_names(self):
+ # See https://github.com/giampaolo/psutil/issues/813
+ p = psutil.Process()
+ p.foo = '1'
+ self.assertNotIn('foo', p.as_dict())
+
+ def test_serialization(self):
+ def check(ret):
+ if json is not None:
+ json.loads(json.dumps(ret))
+ a = pickle.dumps(ret)
+ b = pickle.loads(a)
+ self.assertEqual(ret, b)
+
+ check(psutil.Process().as_dict())
+ check(psutil.virtual_memory())
+ check(psutil.swap_memory())
+ check(psutil.cpu_times())
+ check(psutil.cpu_times_percent(interval=0))
+ check(psutil.net_io_counters())
+ if LINUX and not os.path.exists('/proc/diskstats'):
+ pass
+ else:
+ if not APPVEYOR:
+ check(psutil.disk_io_counters())
+ check(psutil.disk_partitions())
+ check(psutil.disk_usage(os.getcwd()))
+ check(psutil.users())
+
+ # XXX: https://github.com/pypa/setuptools/pull/2896
+ @unittest.skipIf(APPVEYOR, "temporarily disabled due to setuptools bug")
+ def test_setup_script(self):
+ setup_py = os.path.join(ROOT_DIR, 'setup.py')
+ if CI_TESTING and not os.path.exists(setup_py):
+ return self.skipTest("can't find setup.py")
+ module = import_module_by_path(setup_py)
+ self.assertRaises(SystemExit, module.setup)
+ self.assertEqual(module.get_version(), psutil.__version__)
+
+ def test_ad_on_process_creation(self):
+ # We are supposed to be able to instantiate Process also in case
+ # of zombie processes or access denied.
+ with mock.patch.object(psutil.Process, 'create_time',
+ side_effect=psutil.AccessDenied) as meth:
+ psutil.Process()
+ assert meth.called
+ with mock.patch.object(psutil.Process, 'create_time',
+ side_effect=psutil.ZombieProcess(1)) as meth:
+ psutil.Process()
+ assert meth.called
+ with mock.patch.object(psutil.Process, 'create_time',
+ side_effect=ValueError) as meth:
+ with self.assertRaises(ValueError):
+ psutil.Process()
+ assert meth.called
+
+ def test_sanity_version_check(self):
+ # see: https://github.com/giampaolo/psutil/issues/564
+ with mock.patch(
+ "psutil._psplatform.cext.version", return_value="0.0.0"):
+ with self.assertRaises(ImportError) as cm:
+ reload_module(psutil)
+ self.assertIn("version conflict", str(cm.exception).lower())
+
+
+# ===================================================================
+# --- psutil/_common.py utils
+# ===================================================================
+
+
+class TestCommonModule(PsutilTestCase):
+
+ def test_memoize(self):
+ @memoize
+ def foo(*args, **kwargs):
+ """foo docstring"""
+ calls.append(None)
+ return (args, kwargs)
+
+ calls = []
+ # no args
+ for x in range(2):
+ ret = foo()
+ expected = ((), {})
+ self.assertEqual(ret, expected)
+ self.assertEqual(len(calls), 1)
+ # with args
+ for x in range(2):
+ ret = foo(1)
+ expected = ((1, ), {})
+ self.assertEqual(ret, expected)
+ self.assertEqual(len(calls), 2)
+ # with args + kwargs
+ for x in range(2):
+ ret = foo(1, bar=2)
+ expected = ((1, ), {'bar': 2})
+ self.assertEqual(ret, expected)
+ self.assertEqual(len(calls), 3)
+ # clear cache
+ foo.cache_clear()
+ ret = foo()
+ expected = ((), {})
+ self.assertEqual(ret, expected)
+ self.assertEqual(len(calls), 4)
+ # docstring
+ self.assertEqual(foo.__doc__, "foo docstring")
+
+ def test_memoize_when_activated(self):
+ class Foo:
+
+ @memoize_when_activated
+ def foo(self):
+ calls.append(None)
+
+ f = Foo()
+ calls = []
+ f.foo()
+ f.foo()
+ self.assertEqual(len(calls), 2)
+
+ # activate
+ calls = []
+ f.foo.cache_activate(f)
+ f.foo()
+ f.foo()
+ self.assertEqual(len(calls), 1)
+
+ # deactivate
+ calls = []
+ f.foo.cache_deactivate(f)
+ f.foo()
+ f.foo()
+ self.assertEqual(len(calls), 2)
+
+ def test_parse_environ_block(self):
+ def k(s):
+ return s.upper() if WINDOWS else s
+
+ self.assertEqual(parse_environ_block("a=1\0"),
+ {k("a"): "1"})
+ self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
+ {k("a"): "1", k("b"): "2"})
+ self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
+ {k("a"): "1", k("b"): ""})
+ # ignore everything after \0\0
+ self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
+ {k("a"): "1", k("b"): "2"})
+ # ignore everything that is not an assignment
+ self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
+ self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
+ # do not fail if the block is incomplete
+ self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
+
+ def test_supports_ipv6(self):
+ self.addCleanup(supports_ipv6.cache_clear)
+ if supports_ipv6():
+ with mock.patch('psutil._common.socket') as s:
+ s.has_ipv6 = False
+ supports_ipv6.cache_clear()
+ assert not supports_ipv6()
+
+ supports_ipv6.cache_clear()
+ with mock.patch('psutil._common.socket.socket',
+ side_effect=socket.error) as s:
+ assert not supports_ipv6()
+ assert s.called
+
+ supports_ipv6.cache_clear()
+ with mock.patch('psutil._common.socket.socket',
+ side_effect=socket.gaierror) as s:
+ assert not supports_ipv6()
+ supports_ipv6.cache_clear()
+ assert s.called
+
+ supports_ipv6.cache_clear()
+ with mock.patch('psutil._common.socket.socket.bind',
+ side_effect=socket.gaierror) as s:
+ assert not supports_ipv6()
+ supports_ipv6.cache_clear()
+ assert s.called
+ else:
+ with self.assertRaises(socket.error):
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ try:
+ sock.bind(("::1", 0))
+ finally:
+ sock.close()
+
+ def test_isfile_strict(self):
+ this_file = os.path.abspath(__file__)
+ assert isfile_strict(this_file)
+ assert not isfile_strict(os.path.dirname(this_file))
+ with mock.patch('psutil._common.os.stat',
+ side_effect=OSError(errno.EPERM, "foo")):
+ self.assertRaises(OSError, isfile_strict, this_file)
+ with mock.patch('psutil._common.os.stat',
+ side_effect=OSError(errno.EACCES, "foo")):
+ self.assertRaises(OSError, isfile_strict, this_file)
+ with mock.patch('psutil._common.os.stat',
+ side_effect=OSError(errno.ENOENT, "foo")):
+ assert not isfile_strict(this_file)
+ with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
+ assert not isfile_strict(this_file)
+
+ def test_debug(self):
+ if PY3:
+ from io import StringIO
+ else:
+ from StringIO import StringIO
+
+ with redirect_stderr(StringIO()) as f:
+ debug("hello")
+ msg = f.getvalue()
+ assert msg.startswith("psutil-debug"), msg
+ self.assertIn("hello", msg)
+ self.assertIn(__file__.replace('.pyc', '.py'), msg)
+
+ # supposed to use repr(exc)
+ with redirect_stderr(StringIO()) as f:
+ debug(ValueError("this is an error"))
+ msg = f.getvalue()
+ self.assertIn("ignoring ValueError", msg)
+ self.assertIn("'this is an error'", msg)
+
+ # supposed to use str(exc), because of extra info about file name
+ with redirect_stderr(StringIO()) as f:
+ exc = OSError(2, "no such file")
+ exc.filename = "/foo"
+ debug(exc)
+ msg = f.getvalue()
+ self.assertIn("no such file", msg)
+ self.assertIn("/foo", msg)
+
+ def test_cat_bcat(self):
+ testfn = self.get_testfn()
+ with open(testfn, "wt") as f:
+ f.write("foo")
+ self.assertEqual(cat(testfn), "foo")
+ self.assertEqual(bcat(testfn), b"foo")
+ self.assertRaises(FileNotFoundError, cat, testfn + '-invalid')
+ self.assertRaises(FileNotFoundError, bcat, testfn + '-invalid')
+ self.assertEqual(cat(testfn + '-invalid', fallback="bar"), "bar")
+ self.assertEqual(bcat(testfn + '-invalid', fallback="bar"), "bar")
+
+
+# ===================================================================
+# --- Tests for wrap_numbers() function.
+# ===================================================================
+
+
+nt = collections.namedtuple('foo', 'a b c')
+
+
+class TestWrapNumbers(PsutilTestCase):
+
+ def setUp(self):
+ wrap_numbers.cache_clear()
+
+ tearDown = setUp
+
+ def test_first_call(self):
+ input = {'disk1': nt(5, 5, 5)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+ def test_input_hasnt_changed(self):
+ input = {'disk1': nt(5, 5, 5)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+ def test_increase_but_no_wrap(self):
+ input = {'disk1': nt(5, 5, 5)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ input = {'disk1': nt(10, 15, 20)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ input = {'disk1': nt(20, 25, 30)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ input = {'disk1': nt(20, 25, 30)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+ def test_wrap(self):
+ # let's say 100 is the threshold
+ input = {'disk1': nt(100, 100, 100)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ # first wrap restarts from 10
+ input = {'disk1': nt(100, 100, 10)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(100, 100, 110)})
+ # then it remains the same
+ input = {'disk1': nt(100, 100, 10)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(100, 100, 110)})
+ # then it goes up
+ input = {'disk1': nt(100, 100, 90)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(100, 100, 190)})
+ # then it wraps again
+ input = {'disk1': nt(100, 100, 20)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(100, 100, 210)})
+ # and remains the same
+ input = {'disk1': nt(100, 100, 20)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(100, 100, 210)})
+ # now wrap another num
+ input = {'disk1': nt(50, 100, 20)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(150, 100, 210)})
+ # and again
+ input = {'disk1': nt(40, 100, 20)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(190, 100, 210)})
+ # keep it the same
+ input = {'disk1': nt(40, 100, 20)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(190, 100, 210)})
+
+ def test_changing_keys(self):
+ # Emulate a case where the second call to disk_io()
+ # (or whatever) provides a new disk, then the new disk
+ # disappears on the third call.
+ input = {'disk1': nt(5, 5, 5)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ input = {'disk1': nt(5, 5, 5),
+ 'disk2': nt(7, 7, 7)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ input = {'disk1': nt(8, 8, 8)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+ def test_changing_keys_w_wrap(self):
+ input = {'disk1': nt(50, 50, 50),
+ 'disk2': nt(100, 100, 100)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ # disk 2 wraps
+ input = {'disk1': nt(50, 50, 50),
+ 'disk2': nt(100, 100, 10)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(50, 50, 50),
+ 'disk2': nt(100, 100, 110)})
+ # disk 2 disappears
+ input = {'disk1': nt(50, 50, 50)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+ # then it appears again; the old wrap is supposed to be
+ # gone.
+ input = {'disk1': nt(50, 50, 50),
+ 'disk2': nt(100, 100, 100)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ # remains the same
+ input = {'disk1': nt(50, 50, 50),
+ 'disk2': nt(100, 100, 100)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+ # and then wraps again
+ input = {'disk1': nt(50, 50, 50),
+ 'disk2': nt(100, 100, 10)}
+ self.assertEqual(wrap_numbers(input, 'disk_io'),
+ {'disk1': nt(50, 50, 50),
+ 'disk2': nt(100, 100, 110)})
+
+ def test_real_data(self):
+ d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
+ 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
+ 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
+ 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
+ self.assertEqual(wrap_numbers(d, 'disk_io'), d)
+ self.assertEqual(wrap_numbers(d, 'disk_io'), d)
+ # decrease this ↓
+ d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
+ 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
+ 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
+ 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
+ out = wrap_numbers(d, 'disk_io')
+ self.assertEqual(out['nvme0n1'][0], 400)
+
+ # --- cache tests
+
+ def test_cache_first_call(self):
+ input = {'disk1': nt(5, 5, 5)}
+ wrap_numbers(input, 'disk_io')
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(cache[0], {'disk_io': input})
+ self.assertEqual(cache[1], {'disk_io': {}})
+ self.assertEqual(cache[2], {'disk_io': {}})
+
+ def test_cache_call_twice(self):
+ input = {'disk1': nt(5, 5, 5)}
+ wrap_numbers(input, 'disk_io')
+ input = {'disk1': nt(10, 10, 10)}
+ wrap_numbers(input, 'disk_io')
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(cache[0], {'disk_io': input})
+ self.assertEqual(
+ cache[1],
+ {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
+ self.assertEqual(cache[2], {'disk_io': {}})
+
+ def test_cache_wrap(self):
+ # let's say 100 is the threshold
+ input = {'disk1': nt(100, 100, 100)}
+ wrap_numbers(input, 'disk_io')
+
+ # first wrap restarts from 10
+ input = {'disk1': nt(100, 100, 10)}
+ wrap_numbers(input, 'disk_io')
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(cache[0], {'disk_io': input})
+ self.assertEqual(
+ cache[1],
+ {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
+ self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
+
+ def assert_():
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(
+ cache[1],
+ {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
+ ('disk1', 2): 100}})
+ self.assertEqual(cache[2],
+ {'disk_io': {'disk1': set([('disk1', 2)])}})
+
+ # then it remains the same
+ input = {'disk1': nt(100, 100, 10)}
+ wrap_numbers(input, 'disk_io')
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(cache[0], {'disk_io': input})
+ assert_()
+
+ # then it goes up
+ input = {'disk1': nt(100, 100, 90)}
+ wrap_numbers(input, 'disk_io')
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(cache[0], {'disk_io': input})
+ assert_()
+
+ # then it wraps again
+ input = {'disk1': nt(100, 100, 20)}
+ wrap_numbers(input, 'disk_io')
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(cache[0], {'disk_io': input})
+ self.assertEqual(
+ cache[1],
+ {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
+ self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
+
+ def test_cache_changing_keys(self):
+ input = {'disk1': nt(5, 5, 5)}
+ wrap_numbers(input, 'disk_io')
+ input = {'disk1': nt(5, 5, 5),
+ 'disk2': nt(7, 7, 7)}
+ wrap_numbers(input, 'disk_io')
+ cache = wrap_numbers.cache_info()
+ self.assertEqual(cache[0], {'disk_io': input})
+ self.assertEqual(
+ cache[1],
+ {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
+ self.assertEqual(cache[2], {'disk_io': {}})
+
+ def test_cache_clear(self):
+ input = {'disk1': nt(5, 5, 5)}
+ wrap_numbers(input, 'disk_io')
+ wrap_numbers(input, 'disk_io')
+ wrap_numbers.cache_clear('disk_io')
+ self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
+ wrap_numbers.cache_clear('disk_io')
+ wrap_numbers.cache_clear('?!?')
+
+ @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
+ def test_cache_clear_public_apis(self):
+ if not psutil.disk_io_counters() or not psutil.net_io_counters():
+ return self.skipTest("no disks or NICs available")
+ psutil.disk_io_counters()
+ psutil.net_io_counters()
+ caches = wrap_numbers.cache_info()
+ for cache in caches:
+ self.assertIn('psutil.disk_io_counters', cache)
+ self.assertIn('psutil.net_io_counters', cache)
+
+ psutil.disk_io_counters.cache_clear()
+ caches = wrap_numbers.cache_info()
+ for cache in caches:
+ self.assertIn('psutil.net_io_counters', cache)
+ self.assertNotIn('psutil.disk_io_counters', cache)
+
+ psutil.net_io_counters.cache_clear()
+ caches = wrap_numbers.cache_info()
+ self.assertEqual(caches, ({}, {}, {}))
+
+
+# ===================================================================
+# --- Example script tests
+# ===================================================================
+
+
+@unittest.skipIf(not os.path.exists(SCRIPTS_DIR),
+ "can't locate scripts directory")
+class TestScripts(PsutilTestCase):
+ """Tests for scripts in the "scripts" directory."""
+
+ @staticmethod
+ def assert_stdout(exe, *args, **kwargs):
+ exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
+ cmd = [PYTHON_EXE, exe]
+ for arg in args:
+ cmd.append(arg)
+ try:
+ out = sh(cmd, **kwargs).strip()
+ except RuntimeError as err:
+ if 'AccessDenied' in str(err):
+ return str(err)
+ else:
+ raise
+ assert out, out
+ return out
+
+ @staticmethod
+ def assert_syntax(exe, args=None):
+ exe = os.path.join(SCRIPTS_DIR, exe)
+ if PY3:
+ f = open(exe, 'rt', encoding='utf8')
+ else:
+ f = open(exe, 'rt')
+ with f:
+ src = f.read()
+ ast.parse(src)
+
+ def test_coverage(self):
+ # make sure all example scripts have a test method defined
+ meths = dir(self)
+ for name in os.listdir(SCRIPTS_DIR):
+ if name.endswith('.py'):
+ if 'test_' + os.path.splitext(name)[0] not in meths:
+ # self.assert_stdout(name)
+ raise self.fail('no test defined for %r script'
+ % os.path.join(SCRIPTS_DIR, name))
+
+ @unittest.skipIf(not POSIX, "POSIX only")
+ def test_executable(self):
+ for root, dirs, files in os.walk(SCRIPTS_DIR):
+ for file in files:
+ if file.endswith('.py'):
+ path = os.path.join(root, file)
+ if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
+ raise self.fail('%r is not executable' % path)
+
+ def test_disk_usage(self):
+ self.assert_stdout('disk_usage.py')
+
+ def test_free(self):
+ self.assert_stdout('free.py')
+
+ def test_meminfo(self):
+ self.assert_stdout('meminfo.py')
+
+ def test_procinfo(self):
+ self.assert_stdout('procinfo.py', str(os.getpid()))
+
+ @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
+ def test_who(self):
+ self.assert_stdout('who.py')
+
+ def test_ps(self):
+ self.assert_stdout('ps.py')
+
+ def test_pstree(self):
+ self.assert_stdout('pstree.py')
+
+ def test_netstat(self):
+ self.assert_stdout('netstat.py')
+
+ def test_ifconfig(self):
+ self.assert_stdout('ifconfig.py')
+
+ @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
+ def test_pmap(self):
+ self.assert_stdout('pmap.py', str(os.getpid()))
+
+ def test_procsmem(self):
+ if 'uss' not in psutil.Process().memory_full_info()._fields:
+ raise self.skipTest("not supported")
+ self.assert_stdout('procsmem.py')
+
+ def test_killall(self):
+ self.assert_syntax('killall.py')
+
+ def test_nettop(self):
+ self.assert_syntax('nettop.py')
+
+ def test_top(self):
+ self.assert_syntax('top.py')
+
+ def test_iotop(self):
+ self.assert_syntax('iotop.py')
+
+ def test_pidof(self):
+ output = self.assert_stdout('pidof.py', psutil.Process().name())
+ self.assertIn(str(os.getpid()), output)
+
+ @unittest.skipIf(not WINDOWS, "WINDOWS only")
+ def test_winservices(self):
+ self.assert_stdout('winservices.py')
+
+ def test_cpu_distribution(self):
+ self.assert_syntax('cpu_distribution.py')
+
+ @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
+ def test_temperatures(self):
+ if not psutil.sensors_temperatures():
+ self.skipTest("no temperatures")
+ self.assert_stdout('temperatures.py')
+
+ @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
+ def test_fans(self):
+ if not psutil.sensors_fans():
+ self.skipTest("no fans")
+ self.assert_stdout('fans.py')
+
+ @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
+ @unittest.skipIf(not HAS_BATTERY, "no battery")
+ def test_battery(self):
+ self.assert_stdout('battery.py')
+
+ @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
+ @unittest.skipIf(not HAS_BATTERY, "no battery")
+ def test_sensors(self):
+ self.assert_stdout('sensors.py')
+
+
+if __name__ == '__main__':
+ from psutil.tests.runner import run_from_name
+ run_from_name(__file__)