summaryrefslogtreecommitdiffstats
path: root/lib/psutil/tests/test_posix.py
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/psutil/tests/test_posix.py
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'lib/psutil/tests/test_posix.py')
-rw-r--r--lib/psutil/tests/test_posix.py432
1 files changed, 432 insertions, 0 deletions
diff --git a/lib/psutil/tests/test_posix.py b/lib/psutil/tests/test_posix.py
new file mode 100644
index 0000000..d873223
--- /dev/null
+++ b/lib/psutil/tests/test_posix.py
@@ -0,0 +1,432 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""POSIX specific tests."""
+
+import datetime
+import errno
+import os
+import re
+import subprocess
+import time
+import unittest
+
+import psutil
+from psutil import AIX
+from psutil import BSD
+from psutil import LINUX
+from psutil import MACOS
+from psutil import OPENBSD
+from psutil import POSIX
+from psutil import SUNOS
+from psutil.tests import CI_TESTING
+from psutil.tests import HAS_NET_IO_COUNTERS
+from psutil.tests import PYTHON_EXE
+from psutil.tests import PsutilTestCase
+from psutil.tests import mock
+from psutil.tests import retry_on_failure
+from psutil.tests import sh
+from psutil.tests import skip_on_access_denied
+from psutil.tests import spawn_testproc
+from psutil.tests import terminate
+from psutil.tests import which
+
+
+if POSIX:
+ import mmap
+ import resource
+
+ from psutil._psutil_posix import getpagesize
+
+
+def ps(fmt, pid=None):
+ """
+ Wrapper for calling the ps command with a little bit of cross-platform
+ support for a narrow range of features.
+ """
+
+ cmd = ['ps']
+
+ if LINUX:
+ cmd.append('--no-headers')
+
+ if pid is not None:
+ cmd.extend(['-p', str(pid)])
+ else:
+ if SUNOS or AIX:
+ cmd.append('-A')
+ else:
+ cmd.append('ax')
+
+ if SUNOS:
+ fmt_map = set(('command', 'comm', 'start', 'stime'))
+ fmt = fmt_map.get(fmt, fmt)
+
+ cmd.extend(['-o', fmt])
+
+ output = sh(cmd)
+
+ if LINUX:
+ output = output.splitlines()
+ else:
+ output = output.splitlines()[1:]
+
+ all_output = []
+ for line in output:
+ line = line.strip()
+
+ try:
+ line = int(line)
+ except ValueError:
+ pass
+
+ all_output.append(line)
+
+ if pid is None:
+ return all_output
+ else:
+ return all_output[0]
+
+# ps "-o" field names differ wildly between platforms.
+# "comm" means "only executable name" but is not available on BSD platforms.
+# "args" means "command with all its arguments", and is also not available
+# on BSD platforms.
+# "command" is like "args" on most platforms, but like "comm" on AIX,
+# and not available on SUNOS.
+# so for the executable name we can use "comm" on Solaris and split "command"
+# on other platforms.
+# to get the cmdline (with args) we have to use "args" on AIX and
+# Solaris, and can use "command" on all others.
+
+
+def ps_name(pid):
+ field = "command"
+ if SUNOS:
+ field = "comm"
+ return ps(field, pid).split()[0]
+
+
+def ps_args(pid):
+ field = "command"
+ if AIX or SUNOS:
+ field = "args"
+ return ps(field, pid)
+
+
+def ps_rss(pid):
+ field = "rss"
+ if AIX:
+ field = "rssize"
+ return ps(field, pid)
+
+
+def ps_vsz(pid):
+ field = "vsz"
+ if AIX:
+ field = "vsize"
+ return ps(field, pid)
+
+
+@unittest.skipIf(not POSIX, "POSIX only")
+class TestProcess(PsutilTestCase):
+ """Compare psutil results against 'ps' command line utility (mainly)."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = spawn_testproc([PYTHON_EXE, "-E", "-O"],
+ stdin=subprocess.PIPE).pid
+
+ @classmethod
+ def tearDownClass(cls):
+ terminate(cls.pid)
+
+ def test_ppid(self):
+ ppid_ps = ps('ppid', self.pid)
+ ppid_psutil = psutil.Process(self.pid).ppid()
+ self.assertEqual(ppid_ps, ppid_psutil)
+
+ def test_uid(self):
+ uid_ps = ps('uid', self.pid)
+ uid_psutil = psutil.Process(self.pid).uids().real
+ self.assertEqual(uid_ps, uid_psutil)
+
+ def test_gid(self):
+ gid_ps = ps('rgid', self.pid)
+ gid_psutil = psutil.Process(self.pid).gids().real
+ self.assertEqual(gid_ps, gid_psutil)
+
+ def test_username(self):
+ username_ps = ps('user', self.pid)
+ username_psutil = psutil.Process(self.pid).username()
+ self.assertEqual(username_ps, username_psutil)
+
+ def test_username_no_resolution(self):
+ # Emulate a case where the system can't resolve the uid to
+ # a username in which case psutil is supposed to return
+ # the stringified uid.
+ p = psutil.Process()
+ with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun:
+ self.assertEqual(p.username(), str(p.uids().real))
+ assert fun.called
+
+ @skip_on_access_denied()
+ @retry_on_failure()
+ def test_rss_memory(self):
+ # give python interpreter some time to properly initialize
+ # so that the results are the same
+ time.sleep(0.1)
+ rss_ps = ps_rss(self.pid)
+ rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
+ self.assertEqual(rss_ps, rss_psutil)
+
+ @skip_on_access_denied()
+ @retry_on_failure()
+ def test_vsz_memory(self):
+ # give python interpreter some time to properly initialize
+ # so that the results are the same
+ time.sleep(0.1)
+ vsz_ps = ps_vsz(self.pid)
+ vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
+ self.assertEqual(vsz_ps, vsz_psutil)
+
+ def test_name(self):
+ name_ps = ps_name(self.pid)
+ # remove path if there is any, from the command
+ name_ps = os.path.basename(name_ps).lower()
+ name_psutil = psutil.Process(self.pid).name().lower()
+ # ...because of how we calculate PYTHON_EXE; on MACOS this may
+ # be "pythonX.Y".
+ name_ps = re.sub(r"\d.\d", "", name_ps)
+ name_psutil = re.sub(r"\d.\d", "", name_psutil)
+ # ...may also be "python.X"
+ name_ps = re.sub(r"\d", "", name_ps)
+ name_psutil = re.sub(r"\d", "", name_psutil)
+ self.assertEqual(name_ps, name_psutil)
+
+ def test_name_long(self):
+ # On UNIX the kernel truncates the name to the first 15
+ # characters. In such a case psutil tries to determine the
+ # full name from the cmdline.
+ name = "long-program-name"
+ cmdline = ["long-program-name-extended", "foo", "bar"]
+ with mock.patch("psutil._psplatform.Process.name",
+ return_value=name):
+ with mock.patch("psutil._psplatform.Process.cmdline",
+ return_value=cmdline):
+ p = psutil.Process()
+ self.assertEqual(p.name(), "long-program-name-extended")
+
+ def test_name_long_cmdline_ad_exc(self):
+ # Same as above but emulates a case where cmdline() raises
+ # AccessDenied in which case psutil is supposed to return
+ # the truncated name instead of crashing.
+ name = "long-program-name"
+ with mock.patch("psutil._psplatform.Process.name",
+ return_value=name):
+ with mock.patch("psutil._psplatform.Process.cmdline",
+ side_effect=psutil.AccessDenied(0, "")):
+ p = psutil.Process()
+ self.assertEqual(p.name(), "long-program-name")
+
+ def test_name_long_cmdline_nsp_exc(self):
+ # Same as above but emulates a case where cmdline() raises NSP
+ # which is supposed to propagate.
+ name = "long-program-name"
+ with mock.patch("psutil._psplatform.Process.name",
+ return_value=name):
+ with mock.patch("psutil._psplatform.Process.cmdline",
+ side_effect=psutil.NoSuchProcess(0, "")):
+ p = psutil.Process()
+ self.assertRaises(psutil.NoSuchProcess, p.name)
+
+ @unittest.skipIf(MACOS or BSD, 'ps -o start not available')
+ def test_create_time(self):
+ time_ps = ps('start', self.pid)
+ time_psutil = psutil.Process(self.pid).create_time()
+ time_psutil_tstamp = datetime.datetime.fromtimestamp(
+ time_psutil).strftime("%H:%M:%S")
+ # sometimes ps shows the time rounded up instead of down, so we check
+ # for both possible values
+ round_time_psutil = round(time_psutil)
+ round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
+ round_time_psutil).strftime("%H:%M:%S")
+ self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
+
+ def test_exe(self):
+ ps_pathname = ps_name(self.pid)
+ psutil_pathname = psutil.Process(self.pid).exe()
+ try:
+ self.assertEqual(ps_pathname, psutil_pathname)
+ except AssertionError:
+ # certain platforms such as BSD are more accurate returning:
+ # "/usr/local/bin/python2.7"
+ # ...instead of:
+ # "/usr/local/bin/python"
+ # We do not want to consider this difference in accuracy
+ # an error.
+ adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
+ self.assertEqual(ps_pathname, adjusted_ps_pathname)
+
+ # On macOS the official python installer exposes a python wrapper that
+ # executes a python executable hidden inside an application bundle inside
+ # the Python framework.
+ # There's a race condition between the ps call & the psutil call below
+ # depending on the completion of the execve call so let's retry on failure
+ @retry_on_failure()
+ def test_cmdline(self):
+ ps_cmdline = ps_args(self.pid)
+ psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
+ self.assertEqual(ps_cmdline, psutil_cmdline)
+
+ # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an
+ # incorrect value (20); the real deal is getpriority(2) which
+ # returns 0; psutil relies on it, see:
+ # https://github.com/giampaolo/psutil/issues/1082
+ # AIX has the same issue
+ @unittest.skipIf(SUNOS, "not reliable on SUNOS")
+ @unittest.skipIf(AIX, "not reliable on AIX")
+ def test_nice(self):
+ ps_nice = ps('nice', self.pid)
+ psutil_nice = psutil.Process().nice()
+ self.assertEqual(ps_nice, psutil_nice)
+
+
+@unittest.skipIf(not POSIX, "POSIX only")
+class TestSystemAPIs(PsutilTestCase):
+ """Test some system APIs."""
+
+ @retry_on_failure()
+ def test_pids(self):
+ # Note: this test might fail if the OS is starting/killing
+ # other processes in the meantime
+ pids_ps = sorted(ps("pid"))
+ pids_psutil = psutil.pids()
+
+ # on MACOS and OPENBSD ps doesn't show pid 0
+ if MACOS or OPENBSD and 0 not in pids_ps:
+ pids_ps.insert(0, 0)
+
+ # There will often be one more process in pids_ps for ps itself
+ if len(pids_ps) - len(pids_psutil) > 1:
+ difference = [x for x in pids_psutil if x not in pids_ps] + \
+ [x for x in pids_ps if x not in pids_psutil]
+ raise self.fail("difference: " + str(difference))
+
+ # for some reason ifconfig -a does not report all interfaces
+ # returned by psutil
+ @unittest.skipIf(SUNOS, "unreliable on SUNOS")
+ @unittest.skipIf(not which('ifconfig'), "no ifconfig cmd")
+ @unittest.skipIf(not HAS_NET_IO_COUNTERS, "not supported")
+ def test_nic_names(self):
+ output = sh("ifconfig -a")
+ for nic in psutil.net_io_counters(pernic=True).keys():
+ for line in output.split():
+ if line.startswith(nic):
+ break
+ else:
+ raise self.fail(
+ "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
+ nic, output))
+
+ @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI")
+ @retry_on_failure()
+ def test_users(self):
+ out = sh("who")
+ if not out.strip():
+ raise self.skipTest("no users on this system")
+ lines = out.split('\n')
+ users = [x.split()[0] for x in lines]
+ terminals = [x.split()[1] for x in lines]
+ self.assertEqual(len(users), len(psutil.users()))
+ for u in psutil.users():
+ self.assertIn(u.name, users)
+ self.assertIn(u.terminal, terminals)
+
+ def test_pid_exists_let_raise(self):
+ # According to "man 2 kill" possible error values for kill
+ # are (EINVAL, EPERM, ESRCH). Test that any other errno
+ # results in an exception.
+ with mock.patch("psutil._psposix.os.kill",
+ side_effect=OSError(errno.EBADF, "")) as m:
+ self.assertRaises(OSError, psutil._psposix.pid_exists, os.getpid())
+ assert m.called
+
+ def test_os_waitpid_let_raise(self):
+ # os.waitpid() is supposed to catch EINTR and ECHILD only.
+ # Test that any other errno results in an exception.
+ with mock.patch("psutil._psposix.os.waitpid",
+ side_effect=OSError(errno.EBADF, "")) as m:
+ self.assertRaises(OSError, psutil._psposix.wait_pid, os.getpid())
+ assert m.called
+
+ def test_os_waitpid_eintr(self):
+ # os.waitpid() is supposed to "retry" on EINTR.
+ with mock.patch("psutil._psposix.os.waitpid",
+ side_effect=OSError(errno.EINTR, "")) as m:
+ self.assertRaises(
+ psutil._psposix.TimeoutExpired,
+ psutil._psposix.wait_pid, os.getpid(), timeout=0.01)
+ assert m.called
+
+ def test_os_waitpid_bad_ret_status(self):
+ # Simulate os.waitpid() returning a bad status.
+ with mock.patch("psutil._psposix.os.waitpid",
+ return_value=(1, -1)) as m:
+ self.assertRaises(ValueError,
+ psutil._psposix.wait_pid, os.getpid())
+ assert m.called
+
+ # AIX can return '-' in df output instead of numbers, e.g. for /proc
+ @unittest.skipIf(AIX, "unreliable on AIX")
+ @retry_on_failure()
+ def test_disk_usage(self):
+ def df(device):
+ out = sh("df -k %s" % device).strip()
+ line = out.split('\n')[1]
+ fields = line.split()
+ total = int(fields[1]) * 1024
+ used = int(fields[2]) * 1024
+ free = int(fields[3]) * 1024
+ percent = float(fields[4].replace('%', ''))
+ return (total, used, free, percent)
+
+ tolerance = 4 * 1024 * 1024 # 4MB
+ for part in psutil.disk_partitions(all=False):
+ usage = psutil.disk_usage(part.mountpoint)
+ try:
+ total, used, free, percent = df(part.device)
+ except RuntimeError as err:
+ # see:
+ # https://travis-ci.org/giampaolo/psutil/jobs/138338464
+ # https://travis-ci.org/giampaolo/psutil/jobs/138343361
+ err = str(err).lower()
+ if "no such file or directory" in err or \
+ "raw devices not supported" in err or \
+ "permission denied" in err:
+ continue
+ else:
+ raise
+ else:
+ self.assertAlmostEqual(usage.total, total, delta=tolerance)
+ self.assertAlmostEqual(usage.used, used, delta=tolerance)
+ self.assertAlmostEqual(usage.free, free, delta=tolerance)
+ self.assertAlmostEqual(usage.percent, percent, delta=1)
+
+
+@unittest.skipIf(not POSIX, "POSIX only")
+class TestMisc(PsutilTestCase):
+
+ def test_getpagesize(self):
+ pagesize = getpagesize()
+ self.assertGreater(pagesize, 0)
+ self.assertEqual(pagesize, resource.getpagesize())
+ self.assertEqual(pagesize, mmap.PAGESIZE)
+
+
+if __name__ == '__main__':
+ from psutil.tests.runner import run_from_name
+ run_from_name(__file__)