summaryrefslogtreecommitdiffstats
path: root/lib/psutil/tests/test_testutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/psutil/tests/test_testutils.py')
-rw-r--r--lib/psutil/tests/test_testutils.py441
1 files changed, 441 insertions, 0 deletions
diff --git a/lib/psutil/tests/test_testutils.py b/lib/psutil/tests/test_testutils.py
new file mode 100644
index 0000000..dd98538
--- /dev/null
+++ b/lib/psutil/tests/test_testutils.py
@@ -0,0 +1,441 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Tests for testing utils (psutil.tests namespace).
+"""
+
+import collections
+import contextlib
+import errno
+import os
+import socket
+import stat
+import subprocess
+import unittest
+
+import psutil
+import psutil.tests
+from psutil import FREEBSD
+from psutil import NETBSD
+from psutil import POSIX
+from psutil._common import open_binary
+from psutil._common import open_text
+from psutil._common import supports_ipv6
+from psutil.tests import CI_TESTING
+from psutil.tests import HAS_CONNECTIONS_UNIX
+from psutil.tests import PYTHON_EXE
+from psutil.tests import PsutilTestCase
+from psutil.tests import TestMemoryLeak
+from psutil.tests import bind_socket
+from psutil.tests import bind_unix_socket
+from psutil.tests import call_until
+from psutil.tests import chdir
+from psutil.tests import create_sockets
+from psutil.tests import get_free_port
+from psutil.tests import is_namedtuple
+from psutil.tests import mock
+from psutil.tests import process_namespace
+from psutil.tests import reap_children
+from psutil.tests import retry
+from psutil.tests import retry_on_failure
+from psutil.tests import safe_mkdir
+from psutil.tests import safe_rmpath
+from psutil.tests import serialrun
+from psutil.tests import system_namespace
+from psutil.tests import tcp_socketpair
+from psutil.tests import terminate
+from psutil.tests import unix_socketpair
+from psutil.tests import wait_for_file
+from psutil.tests import wait_for_pid
+
+
+# ===================================================================
+# --- Unit tests for test utilities.
+# ===================================================================
+
+
+class TestRetryDecorator(PsutilTestCase):
+
+ @mock.patch('time.sleep')
+ def test_retry_success(self, sleep):
+ # Fail 3 times out of 5; make sure the decorated fun returns.
+
+ @retry(retries=5, interval=1, logfun=None)
+ def foo():
+ while queue:
+ queue.pop()
+ 1 / 0
+ return 1
+
+ queue = list(range(3))
+ self.assertEqual(foo(), 1)
+ self.assertEqual(sleep.call_count, 3)
+
+ @mock.patch('time.sleep')
+ def test_retry_failure(self, sleep):
+ # Fail 6 times out of 5; th function is supposed to raise exc.
+ @retry(retries=5, interval=1, logfun=None)
+ def foo():
+ while queue:
+ queue.pop()
+ 1 / 0
+ return 1
+
+ queue = list(range(6))
+ self.assertRaises(ZeroDivisionError, foo)
+ self.assertEqual(sleep.call_count, 5)
+
+ @mock.patch('time.sleep')
+ def test_exception_arg(self, sleep):
+ @retry(exception=ValueError, interval=1)
+ def foo():
+ raise TypeError
+
+ self.assertRaises(TypeError, foo)
+ self.assertEqual(sleep.call_count, 0)
+
+ @mock.patch('time.sleep')
+ def test_no_interval_arg(self, sleep):
+ # if interval is not specified sleep is not supposed to be called
+
+ @retry(retries=5, interval=None, logfun=None)
+ def foo():
+ 1 / 0
+
+ self.assertRaises(ZeroDivisionError, foo)
+ self.assertEqual(sleep.call_count, 0)
+
+ @mock.patch('time.sleep')
+ def test_retries_arg(self, sleep):
+
+ @retry(retries=5, interval=1, logfun=None)
+ def foo():
+ 1 / 0
+
+ self.assertRaises(ZeroDivisionError, foo)
+ self.assertEqual(sleep.call_count, 5)
+
+ @mock.patch('time.sleep')
+ def test_retries_and_timeout_args(self, sleep):
+ self.assertRaises(ValueError, retry, retries=5, timeout=1)
+
+
+class TestSyncTestUtils(PsutilTestCase):
+
+ def test_wait_for_pid(self):
+ wait_for_pid(os.getpid())
+ nopid = max(psutil.pids()) + 99999
+ with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
+ self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
+
+ def test_wait_for_file(self):
+ testfn = self.get_testfn()
+ with open(testfn, 'w') as f:
+ f.write('foo')
+ wait_for_file(testfn)
+ assert not os.path.exists(testfn)
+
+ def test_wait_for_file_empty(self):
+ testfn = self.get_testfn()
+ with open(testfn, 'w'):
+ pass
+ wait_for_file(testfn, empty=True)
+ assert not os.path.exists(testfn)
+
+ def test_wait_for_file_no_file(self):
+ testfn = self.get_testfn()
+ with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
+ self.assertRaises(IOError, wait_for_file, testfn)
+
+ def test_wait_for_file_no_delete(self):
+ testfn = self.get_testfn()
+ with open(testfn, 'w') as f:
+ f.write('foo')
+ wait_for_file(testfn, delete=False)
+ assert os.path.exists(testfn)
+
+ def test_call_until(self):
+ ret = call_until(lambda: 1, "ret == 1")
+ self.assertEqual(ret, 1)
+
+
+class TestFSTestUtils(PsutilTestCase):
+
+ def test_open_text(self):
+ with open_text(__file__) as f:
+ self.assertEqual(f.mode, 'rt')
+
+ def test_open_binary(self):
+ with open_binary(__file__) as f:
+ self.assertEqual(f.mode, 'rb')
+
+ def test_safe_mkdir(self):
+ testfn = self.get_testfn()
+ safe_mkdir(testfn)
+ assert os.path.isdir(testfn)
+ safe_mkdir(testfn)
+ assert os.path.isdir(testfn)
+
+ def test_safe_rmpath(self):
+ # test file is removed
+ testfn = self.get_testfn()
+ open(testfn, 'w').close()
+ safe_rmpath(testfn)
+ assert not os.path.exists(testfn)
+ # test no exception if path does not exist
+ safe_rmpath(testfn)
+ # test dir is removed
+ os.mkdir(testfn)
+ safe_rmpath(testfn)
+ assert not os.path.exists(testfn)
+ # test other exceptions are raised
+ with mock.patch('psutil.tests.os.stat',
+ side_effect=OSError(errno.EINVAL, "")) as m:
+ with self.assertRaises(OSError):
+ safe_rmpath(testfn)
+ assert m.called
+
+ def test_chdir(self):
+ testfn = self.get_testfn()
+ base = os.getcwd()
+ os.mkdir(testfn)
+ with chdir(testfn):
+ self.assertEqual(os.getcwd(), os.path.join(base, testfn))
+ self.assertEqual(os.getcwd(), base)
+
+
+class TestProcessUtils(PsutilTestCase):
+
+ def test_reap_children(self):
+ subp = self.spawn_testproc()
+ p = psutil.Process(subp.pid)
+ assert p.is_running()
+ reap_children()
+ assert not p.is_running()
+ assert not psutil.tests._pids_started
+ assert not psutil.tests._subprocesses_started
+
+ def test_spawn_children_pair(self):
+ child, grandchild = self.spawn_children_pair()
+ self.assertNotEqual(child.pid, grandchild.pid)
+ assert child.is_running()
+ assert grandchild.is_running()
+ children = psutil.Process().children()
+ self.assertEqual(children, [child])
+ children = psutil.Process().children(recursive=True)
+ self.assertEqual(len(children), 2)
+ self.assertIn(child, children)
+ self.assertIn(grandchild, children)
+ self.assertEqual(child.ppid(), os.getpid())
+ self.assertEqual(grandchild.ppid(), child.pid)
+
+ terminate(child)
+ assert not child.is_running()
+ assert grandchild.is_running()
+
+ terminate(grandchild)
+ assert not grandchild.is_running()
+
+ @unittest.skipIf(not POSIX, "POSIX only")
+ def test_spawn_zombie(self):
+ parent, zombie = self.spawn_zombie()
+ self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE)
+
+ def test_terminate(self):
+ # by subprocess.Popen
+ p = self.spawn_testproc()
+ terminate(p)
+ self.assertProcessGone(p)
+ terminate(p)
+ # by psutil.Process
+ p = psutil.Process(self.spawn_testproc().pid)
+ terminate(p)
+ self.assertProcessGone(p)
+ terminate(p)
+ # by psutil.Popen
+ cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
+ p = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ terminate(p)
+ self.assertProcessGone(p)
+ terminate(p)
+ # by PID
+ pid = self.spawn_testproc().pid
+ terminate(pid)
+ self.assertProcessGone(p)
+ terminate(pid)
+ # zombie
+ if POSIX:
+ parent, zombie = self.spawn_zombie()
+ terminate(parent)
+ terminate(zombie)
+ self.assertProcessGone(parent)
+ self.assertProcessGone(zombie)
+
+
+class TestNetUtils(PsutilTestCase):
+
+ def bind_socket(self):
+ port = get_free_port()
+ with contextlib.closing(bind_socket(addr=('', port))) as s:
+ self.assertEqual(s.getsockname()[1], port)
+
+ @unittest.skipIf(not POSIX, "POSIX only")
+ def test_bind_unix_socket(self):
+ name = self.get_testfn()
+ sock = bind_unix_socket(name)
+ with contextlib.closing(sock):
+ self.assertEqual(sock.family, socket.AF_UNIX)
+ self.assertEqual(sock.type, socket.SOCK_STREAM)
+ self.assertEqual(sock.getsockname(), name)
+ assert os.path.exists(name)
+ assert stat.S_ISSOCK(os.stat(name).st_mode)
+ # UDP
+ name = self.get_testfn()
+ sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
+ with contextlib.closing(sock):
+ self.assertEqual(sock.type, socket.SOCK_DGRAM)
+
+ def tcp_tcp_socketpair(self):
+ addr = ("127.0.0.1", get_free_port())
+ server, client = tcp_socketpair(socket.AF_INET, addr=addr)
+ with contextlib.closing(server):
+ with contextlib.closing(client):
+ # Ensure they are connected and the positions are
+ # correct.
+ self.assertEqual(server.getsockname(), addr)
+ self.assertEqual(client.getpeername(), addr)
+ self.assertNotEqual(client.getsockname(), addr)
+
+ @unittest.skipIf(not POSIX, "POSIX only")
+ @unittest.skipIf(NETBSD or FREEBSD,
+ "/var/run/log UNIX socket opened by default")
+ def test_unix_socketpair(self):
+ p = psutil.Process()
+ num_fds = p.num_fds()
+ assert not p.connections(kind='unix')
+ name = self.get_testfn()
+ server, client = unix_socketpair(name)
+ try:
+ assert os.path.exists(name)
+ assert stat.S_ISSOCK(os.stat(name).st_mode)
+ self.assertEqual(p.num_fds() - num_fds, 2)
+ self.assertEqual(len(p.connections(kind='unix')), 2)
+ self.assertEqual(server.getsockname(), name)
+ self.assertEqual(client.getpeername(), name)
+ finally:
+ client.close()
+ server.close()
+
+ def test_create_sockets(self):
+ with create_sockets() as socks:
+ fams = collections.defaultdict(int)
+ types = collections.defaultdict(int)
+ for s in socks:
+ fams[s.family] += 1
+ # work around http://bugs.python.org/issue30204
+ types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1
+ self.assertGreaterEqual(fams[socket.AF_INET], 2)
+ if supports_ipv6():
+ self.assertGreaterEqual(fams[socket.AF_INET6], 2)
+ if POSIX and HAS_CONNECTIONS_UNIX:
+ self.assertGreaterEqual(fams[socket.AF_UNIX], 2)
+ self.assertGreaterEqual(types[socket.SOCK_STREAM], 2)
+ self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2)
+
+
+@serialrun
+class TestMemLeakClass(TestMemoryLeak):
+
+ @retry_on_failure()
+ def test_times(self):
+ def fun():
+ cnt['cnt'] += 1
+ cnt = {'cnt': 0}
+ self.execute(fun, times=10, warmup_times=15)
+ self.assertEqual(cnt['cnt'], 26)
+
+ def test_param_err(self):
+ self.assertRaises(ValueError, self.execute, lambda: 0, times=0)
+ self.assertRaises(ValueError, self.execute, lambda: 0, times=-1)
+ self.assertRaises(ValueError, self.execute, lambda: 0, warmup_times=-1)
+ self.assertRaises(ValueError, self.execute, lambda: 0, tolerance=-1)
+ self.assertRaises(ValueError, self.execute, lambda: 0, retries=-1)
+
+ @retry_on_failure()
+ @unittest.skipIf(CI_TESTING, "skipped on CI")
+ def test_leak_mem(self):
+ ls = []
+
+ def fun(ls=ls):
+ ls.append("x" * 24 * 1024)
+
+ try:
+ # will consume around 3M in total
+ self.assertRaisesRegex(AssertionError, "extra-mem",
+ self.execute, fun, times=50)
+ finally:
+ del ls
+
+ def test_unclosed_files(self):
+ def fun():
+ f = open(__file__)
+ self.addCleanup(f.close)
+ box.append(f)
+
+ box = []
+ kind = "fd" if POSIX else "handle"
+ self.assertRaisesRegex(AssertionError, "unclosed " + kind,
+ self.execute, fun)
+
+ def test_tolerance(self):
+ def fun():
+ ls.append("x" * 24 * 1024)
+ ls = []
+ times = 100
+ self.execute(fun, times=times, warmup_times=0,
+ tolerance=200 * 1024 * 1024)
+ self.assertEqual(len(ls), times + 1)
+
+ def test_execute_w_exc(self):
+ def fun():
+ 1 / 0
+ self.execute_w_exc(ZeroDivisionError, fun)
+ with self.assertRaises(ZeroDivisionError):
+ self.execute_w_exc(OSError, fun)
+
+ def fun():
+ pass
+ with self.assertRaises(AssertionError):
+ self.execute_w_exc(ZeroDivisionError, fun)
+
+
+class TestTestingUtils(PsutilTestCase):
+
+ def test_process_namespace(self):
+ p = psutil.Process()
+ ns = process_namespace(p)
+ ns.test()
+ fun = [x for x in ns.iter(ns.getters) if x[1] == 'ppid'][0][0]
+ self.assertEqual(fun(), p.ppid())
+
+ def test_system_namespace(self):
+ ns = system_namespace()
+ fun = [x for x in ns.iter(ns.getters) if x[1] == 'net_if_addrs'][0][0]
+ self.assertEqual(fun(), psutil.net_if_addrs())
+
+
+class TestOtherUtils(PsutilTestCase):
+
+ def test_is_namedtuple(self):
+ assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3))
+ assert not is_namedtuple(tuple())
+
+
+if __name__ == '__main__':
+ from psutil.tests.runner import run_from_name
+ run_from_name(__file__)