summaryrefslogtreecommitdiffstats
path: root/lib/psutil/tests/test_connections.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/psutil/tests/test_connections.py')
-rw-r--r--lib/psutil/tests/test_connections.py554
1 files changed, 554 insertions, 0 deletions
diff --git a/lib/psutil/tests/test_connections.py b/lib/psutil/tests/test_connections.py
new file mode 100644
index 0000000..f3b1f83
--- /dev/null
+++ b/lib/psutil/tests/test_connections.py
@@ -0,0 +1,554 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for net_connections() and Process.connections() APIs."""
+
+import os
+import socket
+import textwrap
+import unittest
+from contextlib import closing
+from socket import AF_INET
+from socket import AF_INET6
+from socket import SOCK_DGRAM
+from socket import SOCK_STREAM
+
+import psutil
+from psutil import FREEBSD
+from psutil import LINUX
+from psutil import MACOS
+from psutil import NETBSD
+from psutil import OPENBSD
+from psutil import POSIX
+from psutil import SUNOS
+from psutil import WINDOWS
+from psutil._common import supports_ipv6
+from psutil._compat import PY3
+from psutil.tests import AF_UNIX
+from psutil.tests import HAS_CONNECTIONS_UNIX
+from psutil.tests import SKIP_SYSCONS
+from psutil.tests import PsutilTestCase
+from psutil.tests import bind_socket
+from psutil.tests import bind_unix_socket
+from psutil.tests import check_connection_ntuple
+from psutil.tests import create_sockets
+from psutil.tests import reap_children
+from psutil.tests import retry_on_failure
+from psutil.tests import serialrun
+from psutil.tests import skip_on_access_denied
+from psutil.tests import tcp_socketpair
+from psutil.tests import unix_socketpair
+from psutil.tests import wait_for_file
+
+
+thisproc = psutil.Process()
+SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
+
+
+@serialrun
+class ConnectionTestCase(PsutilTestCase):
+
+ def setUp(self):
+ if not (NETBSD or FREEBSD):
+ # process opens a UNIX socket to /var/log/run.
+ cons = thisproc.connections(kind='all')
+ assert not cons, cons
+
+ def tearDown(self):
+ if not (FREEBSD or NETBSD):
+ # Make sure we closed all resources.
+ # NetBSD opens a UNIX socket to /var/log/run.
+ cons = thisproc.connections(kind='all')
+ assert not cons, cons
+
+ def compare_procsys_connections(self, pid, proc_cons, kind='all'):
+ """Given a process PID and its list of connections compare
+ those against system-wide connections retrieved via
+ psutil.net_connections.
+ """
+ try:
+ sys_cons = psutil.net_connections(kind=kind)
+ except psutil.AccessDenied:
+ # On MACOS, system-wide connections are retrieved by iterating
+ # over all processes
+ if MACOS:
+ return
+ else:
+ raise
+ # Filter for this proc PID and exlucde PIDs from the tuple.
+ sys_cons = [c[:-1] for c in sys_cons if c.pid == pid]
+ sys_cons.sort()
+ proc_cons.sort()
+ self.assertEqual(proc_cons, sys_cons)
+
+
+class TestBasicOperations(ConnectionTestCase):
+
+ @unittest.skipIf(SKIP_SYSCONS, "requires root")
+ def test_system(self):
+ with create_sockets():
+ for conn in psutil.net_connections(kind='all'):
+ check_connection_ntuple(conn)
+
+ def test_process(self):
+ with create_sockets():
+ for conn in psutil.Process().connections(kind='all'):
+ check_connection_ntuple(conn)
+
+ def test_invalid_kind(self):
+ self.assertRaises(ValueError, thisproc.connections, kind='???')
+ self.assertRaises(ValueError, psutil.net_connections, kind='???')
+
+
+@serialrun
+class TestUnconnectedSockets(ConnectionTestCase):
+ """Tests sockets which are open but not connected to anything."""
+
+ def get_conn_from_sock(self, sock):
+ cons = thisproc.connections(kind='all')
+ smap = dict([(c.fd, c) for c in cons])
+ if NETBSD or FREEBSD:
+ # NetBSD opens a UNIX socket to /var/log/run
+ # so there may be more connections.
+ return smap[sock.fileno()]
+ else:
+ self.assertEqual(len(cons), 1)
+ if cons[0].fd != -1:
+ self.assertEqual(smap[sock.fileno()].fd, sock.fileno())
+ return cons[0]
+
+ def check_socket(self, sock):
+ """Given a socket, makes sure it matches the one obtained
+ via psutil. It assumes this process created one connection
+ only (the one supposed to be checked).
+ """
+ conn = self.get_conn_from_sock(sock)
+ check_connection_ntuple(conn)
+
+ # fd, family, type
+ if conn.fd != -1:
+ self.assertEqual(conn.fd, sock.fileno())
+ self.assertEqual(conn.family, sock.family)
+ # see: http://bugs.python.org/issue30204
+ self.assertEqual(
+ conn.type, sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE))
+
+ # local address
+ laddr = sock.getsockname()
+ if not laddr and PY3 and isinstance(laddr, bytes):
+ # See: http://bugs.python.org/issue30205
+ laddr = laddr.decode()
+ if sock.family == AF_INET6:
+ laddr = laddr[:2]
+ if sock.family == AF_UNIX and OPENBSD:
+ # No addresses are set for UNIX sockets on OpenBSD.
+ pass
+ else:
+ self.assertEqual(conn.laddr, laddr)
+
+ # XXX Solaris can't retrieve system-wide UNIX sockets
+ if sock.family == AF_UNIX and HAS_CONNECTIONS_UNIX:
+ cons = thisproc.connections(kind='all')
+ self.compare_procsys_connections(os.getpid(), cons, kind='all')
+ return conn
+
+ def test_tcp_v4(self):
+ addr = ("127.0.0.1", 0)
+ with closing(bind_socket(AF_INET, SOCK_STREAM, addr=addr)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_LISTEN)
+
+ @unittest.skipIf(not supports_ipv6(), "IPv6 not supported")
+ def test_tcp_v6(self):
+ addr = ("::1", 0)
+ with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_LISTEN)
+
+ def test_udp_v4(self):
+ addr = ("127.0.0.1", 0)
+ with closing(bind_socket(AF_INET, SOCK_DGRAM, addr=addr)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_NONE)
+
+ @unittest.skipIf(not supports_ipv6(), "IPv6 not supported")
+ def test_udp_v6(self):
+ addr = ("::1", 0)
+ with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_NONE)
+
+ @unittest.skipIf(not POSIX, 'POSIX only')
+ def test_unix_tcp(self):
+ testfn = self.get_testfn()
+ with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_NONE)
+
+ @unittest.skipIf(not POSIX, 'POSIX only')
+ def test_unix_udp(self):
+ testfn = self.get_testfn()
+ with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_NONE)
+
+
+@serialrun
+class TestConnectedSocket(ConnectionTestCase):
+ """Test socket pairs which are are actually connected to
+ each other.
+ """
+
+ # On SunOS, even after we close() it, the server socket stays around
+ # in TIME_WAIT state.
+ @unittest.skipIf(SUNOS, "unreliable on SUONS")
+ def test_tcp(self):
+ addr = ("127.0.0.1", 0)
+ assert not thisproc.connections(kind='tcp4')
+ server, client = tcp_socketpair(AF_INET, addr=addr)
+ try:
+ cons = thisproc.connections(kind='tcp4')
+ self.assertEqual(len(cons), 2)
+ self.assertEqual(cons[0].status, psutil.CONN_ESTABLISHED)
+ self.assertEqual(cons[1].status, psutil.CONN_ESTABLISHED)
+ # May not be fast enough to change state so it stays
+ # commenteed.
+ # client.close()
+ # cons = thisproc.connections(kind='all')
+ # self.assertEqual(len(cons), 1)
+ # self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT)
+ finally:
+ server.close()
+ client.close()
+
+ @unittest.skipIf(not POSIX, 'POSIX only')
+ def test_unix(self):
+ testfn = self.get_testfn()
+ server, client = unix_socketpair(testfn)
+ try:
+ cons = thisproc.connections(kind='unix')
+ assert not (cons[0].laddr and cons[0].raddr)
+ assert not (cons[1].laddr and cons[1].raddr)
+ if NETBSD or FREEBSD:
+ # On NetBSD creating a UNIX socket will cause
+ # a UNIX connection to /var/run/log.
+ cons = [c for c in cons if c.raddr != '/var/run/log']
+ self.assertEqual(len(cons), 2, msg=cons)
+ if LINUX or FREEBSD or SUNOS:
+ # remote path is never set
+ self.assertEqual(cons[0].raddr, "")
+ self.assertEqual(cons[1].raddr, "")
+ # one local address should though
+ self.assertEqual(testfn, cons[0].laddr or cons[1].laddr)
+ elif OPENBSD:
+ # No addresses whatsoever here.
+ for addr in (cons[0].laddr, cons[0].raddr,
+ cons[1].laddr, cons[1].raddr):
+ self.assertEqual(addr, "")
+ else:
+ # On other systems either the laddr or raddr
+ # of both peers are set.
+ self.assertEqual(cons[0].laddr or cons[1].laddr, testfn)
+ self.assertEqual(cons[0].raddr or cons[1].raddr, testfn)
+ finally:
+ server.close()
+ client.close()
+
+
+class TestFilters(ConnectionTestCase):
+
+ def test_filters(self):
+ def check(kind, families, types):
+ for conn in thisproc.connections(kind=kind):
+ self.assertIn(conn.family, families)
+ self.assertIn(conn.type, types)
+ if not SKIP_SYSCONS:
+ for conn in psutil.net_connections(kind=kind):
+ self.assertIn(conn.family, families)
+ self.assertIn(conn.type, types)
+
+ with create_sockets():
+ check('all',
+ [AF_INET, AF_INET6, AF_UNIX],
+ [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET])
+ check('inet',
+ [AF_INET, AF_INET6],
+ [SOCK_STREAM, SOCK_DGRAM])
+ check('inet4',
+ [AF_INET],
+ [SOCK_STREAM, SOCK_DGRAM])
+ check('tcp',
+ [AF_INET, AF_INET6],
+ [SOCK_STREAM])
+ check('tcp4',
+ [AF_INET],
+ [SOCK_STREAM])
+ check('tcp6',
+ [AF_INET6],
+ [SOCK_STREAM])
+ check('udp',
+ [AF_INET, AF_INET6],
+ [SOCK_DGRAM])
+ check('udp4',
+ [AF_INET],
+ [SOCK_DGRAM])
+ check('udp6',
+ [AF_INET6],
+ [SOCK_DGRAM])
+ if HAS_CONNECTIONS_UNIX:
+ check('unix',
+ [AF_UNIX],
+ [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET])
+
+ @skip_on_access_denied(only_if=MACOS)
+ def test_combos(self):
+ reap_children()
+
+ def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
+ all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4",
+ "tcp6", "udp", "udp4", "udp6")
+ check_connection_ntuple(conn)
+ self.assertEqual(conn.family, family)
+ self.assertEqual(conn.type, type)
+ self.assertEqual(conn.laddr, laddr)
+ self.assertEqual(conn.raddr, raddr)
+ self.assertEqual(conn.status, status)
+ for kind in all_kinds:
+ cons = proc.connections(kind=kind)
+ if kind in kinds:
+ assert cons
+ else:
+ assert not cons, cons
+ # compare against system-wide connections
+ # XXX Solaris can't retrieve system-wide UNIX
+ # sockets.
+ if HAS_CONNECTIONS_UNIX:
+ self.compare_procsys_connections(proc.pid, [conn])
+
+ tcp_template = textwrap.dedent("""
+ import socket, time
+ s = socket.socket({family}, socket.SOCK_STREAM)
+ s.bind(('{addr}', 0))
+ s.listen(5)
+ with open('{testfn}', 'w') as f:
+ f.write(str(s.getsockname()[:2]))
+ time.sleep(60)
+ """)
+
+ udp_template = textwrap.dedent("""
+ import socket, time
+ s = socket.socket({family}, socket.SOCK_DGRAM)
+ s.bind(('{addr}', 0))
+ with open('{testfn}', 'w') as f:
+ f.write(str(s.getsockname()[:2]))
+ time.sleep(60)
+ """)
+
+ # must be relative on Windows
+ testfile = os.path.basename(self.get_testfn(dir=os.getcwd()))
+ tcp4_template = tcp_template.format(
+ family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
+ udp4_template = udp_template.format(
+ family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
+ tcp6_template = tcp_template.format(
+ family=int(AF_INET6), addr="::1", testfn=testfile)
+ udp6_template = udp_template.format(
+ family=int(AF_INET6), addr="::1", testfn=testfile)
+
+ # launch various subprocess instantiating a socket of various
+ # families and types to enrich psutil results
+ tcp4_proc = self.pyrun(tcp4_template)
+ tcp4_addr = eval(wait_for_file(testfile, delete=True))
+ udp4_proc = self.pyrun(udp4_template)
+ udp4_addr = eval(wait_for_file(testfile, delete=True))
+ if supports_ipv6():
+ tcp6_proc = self.pyrun(tcp6_template)
+ tcp6_addr = eval(wait_for_file(testfile, delete=True))
+ udp6_proc = self.pyrun(udp6_template)
+ udp6_addr = eval(wait_for_file(testfile, delete=True))
+ else:
+ tcp6_proc = None
+ udp6_proc = None
+ tcp6_addr = None
+ udp6_addr = None
+
+ for p in thisproc.children():
+ cons = p.connections()
+ self.assertEqual(len(cons), 1)
+ for conn in cons:
+ # TCP v4
+ if p.pid == tcp4_proc.pid:
+ check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (),
+ psutil.CONN_LISTEN,
+ ("all", "inet", "inet4", "tcp", "tcp4"))
+ # UDP v4
+ elif p.pid == udp4_proc.pid:
+ check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (),
+ psutil.CONN_NONE,
+ ("all", "inet", "inet4", "udp", "udp4"))
+ # TCP v6
+ elif p.pid == getattr(tcp6_proc, "pid", None):
+ check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (),
+ psutil.CONN_LISTEN,
+ ("all", "inet", "inet6", "tcp", "tcp6"))
+ # UDP v6
+ elif p.pid == getattr(udp6_proc, "pid", None):
+ check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (),
+ psutil.CONN_NONE,
+ ("all", "inet", "inet6", "udp", "udp6"))
+
+ def test_count(self):
+ with create_sockets():
+ # tcp
+ cons = thisproc.connections(kind='tcp')
+ self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
+ for conn in cons:
+ self.assertIn(conn.family, (AF_INET, AF_INET6))
+ self.assertEqual(conn.type, SOCK_STREAM)
+ # tcp4
+ cons = thisproc.connections(kind='tcp4')
+ self.assertEqual(len(cons), 1)
+ self.assertEqual(cons[0].family, AF_INET)
+ self.assertEqual(cons[0].type, SOCK_STREAM)
+ # tcp6
+ if supports_ipv6():
+ cons = thisproc.connections(kind='tcp6')
+ self.assertEqual(len(cons), 1)
+ self.assertEqual(cons[0].family, AF_INET6)
+ self.assertEqual(cons[0].type, SOCK_STREAM)
+ # udp
+ cons = thisproc.connections(kind='udp')
+ self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
+ for conn in cons:
+ self.assertIn(conn.family, (AF_INET, AF_INET6))
+ self.assertEqual(conn.type, SOCK_DGRAM)
+ # udp4
+ cons = thisproc.connections(kind='udp4')
+ self.assertEqual(len(cons), 1)
+ self.assertEqual(cons[0].family, AF_INET)
+ self.assertEqual(cons[0].type, SOCK_DGRAM)
+ # udp6
+ if supports_ipv6():
+ cons = thisproc.connections(kind='udp6')
+ self.assertEqual(len(cons), 1)
+ self.assertEqual(cons[0].family, AF_INET6)
+ self.assertEqual(cons[0].type, SOCK_DGRAM)
+ # inet
+ cons = thisproc.connections(kind='inet')
+ self.assertEqual(len(cons), 4 if supports_ipv6() else 2)
+ for conn in cons:
+ self.assertIn(conn.family, (AF_INET, AF_INET6))
+ self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
+ # inet6
+ if supports_ipv6():
+ cons = thisproc.connections(kind='inet6')
+ self.assertEqual(len(cons), 2)
+ for conn in cons:
+ self.assertEqual(conn.family, AF_INET6)
+ self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
+ # Skipped on BSD becayse by default the Python process
+ # creates a UNIX socket to '/var/run/log'.
+ if HAS_CONNECTIONS_UNIX and not (FREEBSD or NETBSD):
+ cons = thisproc.connections(kind='unix')
+ self.assertEqual(len(cons), 3)
+ for conn in cons:
+ self.assertEqual(conn.family, AF_UNIX)
+ self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
+
+
+@unittest.skipIf(SKIP_SYSCONS, "requires root")
+class TestSystemWideConnections(ConnectionTestCase):
+ """Tests for net_connections()."""
+
+ def test_it(self):
+ def check(cons, families, types_):
+ for conn in cons:
+ self.assertIn(conn.family, families, msg=conn)
+ if conn.family != AF_UNIX:
+ self.assertIn(conn.type, types_, msg=conn)
+ check_connection_ntuple(conn)
+
+ with create_sockets():
+ from psutil._common import conn_tmap
+ for kind, groups in conn_tmap.items():
+ # XXX: SunOS does not retrieve UNIX sockets.
+ if kind == 'unix' and not HAS_CONNECTIONS_UNIX:
+ continue
+ families, types_ = groups
+ cons = psutil.net_connections(kind)
+ self.assertEqual(len(cons), len(set(cons)))
+ check(cons, families, types_)
+
+ @retry_on_failure()
+ def test_multi_sockets_procs(self):
+ # Creates multiple sub processes, each creating different
+ # sockets. For each process check that proc.connections()
+ # and net_connections() return the same results.
+ # This is done mainly to check whether net_connections()'s
+ # pid is properly set, see:
+ # https://github.com/giampaolo/psutil/issues/1013
+ with create_sockets() as socks:
+ expected = len(socks)
+ pids = []
+ times = 10
+ fnames = []
+ for i in range(times):
+ fname = self.get_testfn()
+ fnames.append(fname)
+ src = textwrap.dedent("""\
+ import time, os
+ from psutil.tests import create_sockets
+ with create_sockets():
+ with open(r'%s', 'w') as f:
+ f.write("hello")
+ time.sleep(60)
+ """ % fname)
+ sproc = self.pyrun(src)
+ pids.append(sproc.pid)
+
+ # sync
+ for fname in fnames:
+ wait_for_file(fname)
+
+ syscons = [x for x in psutil.net_connections(kind='all') if x.pid
+ in pids]
+ for pid in pids:
+ self.assertEqual(len([x for x in syscons if x.pid == pid]),
+ expected)
+ p = psutil.Process(pid)
+ self.assertEqual(len(p.connections('all')), expected)
+
+
+class TestMisc(PsutilTestCase):
+
+ def test_connection_constants(self):
+ ints = []
+ strs = []
+ for name in dir(psutil):
+ if name.startswith('CONN_'):
+ num = getattr(psutil, name)
+ str_ = str(num)
+ assert str_.isupper(), str_
+ self.assertNotIn(str, strs)
+ self.assertNotIn(num, ints)
+ ints.append(num)
+ strs.append(str_)
+ if SUNOS:
+ psutil.CONN_IDLE
+ psutil.CONN_BOUND
+ if WINDOWS:
+ psutil.CONN_DELETE_TCB
+
+
+if __name__ == '__main__':
+ from psutil.tests.runner import run_from_name
+ run_from_name(__file__)