0
0
mirror of https://github.com/python/cpython.git synced 2024-11-21 21:09:37 +01:00
cpython/Lib/test/support/socket_helper.py
Victor Stinner a52a350977
gh-109015: Add test.support.socket_helper.tcp_blackhole() (#109016)
Skip test_asyncio, test_imaplib and test_socket tests if FreeBSD TCP
blackhole is enabled (net.inet.tcp.blackhole=2).
2023-09-07 01:58:03 +02:00

340 lines
13 KiB
Python

import contextlib
import errno
import os.path
import socket
import sys
import subprocess
import tempfile
import unittest
from .. import support
HOST = "localhost"
HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
# WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP.
has_gethostname = not support.is_wasi
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
the specified host address (defaults to 0.0.0.0) with the port set to 0,
eliciting an unused ephemeral port from the OS. The temporary socket is
then closed and deleted, and the ephemeral port is returned.
Either this method or bind_port() should be used for any tests where a
server socket needs to be bound to a particular port for the duration of
the test. Which one to use depends on whether the calling code is creating
a python socket, or if an unused port needs to be provided in a constructor
or passed to an external program (i.e. the -accept argument to openssl's
s_server mode). Always prefer bind_port() over find_unused_port() where
possible. Hard coded ports should *NEVER* be used. As soon as a server
socket is bound to a hard coded port, the ability to run multiple instances
of the test simultaneously on the same host is compromised, which makes the
test a ticking time bomb in a buildbot environment. On Unix buildbots, this
may simply manifest as a failed test, which can be recovered from without
intervention in most cases, but on Windows, the entire python process can
completely and utterly wedge, requiring someone to log in to the buildbot
and manually kill the affected process.
(This is easy to reproduce on Windows, unfortunately, and can be traced to
the SO_REUSEADDR socket option having different semantics on Windows versus
Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
listen and then accept connections on identical host/ports. An EADDRINUSE
OSError will be raised at some point (depending on the platform and
the order bind and listen were called on each socket).
However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
will ever be raised when attempting to bind two identical host/ports. When
accept() is called on each socket, the second caller's process will steal
the port from the first caller, leaving them both in an awkwardly wedged
state where they'll no longer respond to any signals or graceful kills, and
must be forcibly killed via OpenProcess()/TerminateProcess().
The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
instead of SO_REUSEADDR, which effectively affords the same semantics as
SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
Source world compared to Windows ones, this is a common mistake. A quick
look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
openssl.exe is called with the 's_server' option, for example. See
http://bugs.python.org/issue2550 for more info. The following site also
has a very thorough description about the implications of both REUSEADDR
and EXCLUSIVEADDRUSE on Windows:
https://learn.microsoft.com/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
XXX: although this approach is a vast improvement on previous attempts to
elicit unused ports, it rests heavily on the assumption that the ephemeral
port returned to us by the OS won't immediately be dished back out to some
other process when we close and delete our temporary socket but before our
calling code has a chance to bind the returned port. We can deal with this
issue if/when we come across it.
"""
with socket.socket(family, socktype) as tempsock:
port = bind_port(tempsock)
del tempsock
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise support.TestFailed("tests should never set the "
"SO_REUSEADDR socket option on "
"TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise support.TestFailed("tests should never set the "
"SO_REUSEPORT socket option on "
"TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_unix_socket(sock, addr):
"""Bind a unix socket, raising SkipTest if PermissionError is raised."""
assert sock.family == socket.AF_UNIX
try:
sock.bind(addr)
except PermissionError:
sock.close()
raise unittest.SkipTest('cannot bind AF_UNIX sockets')
def _is_ipv6_enabled():
"""Check whether IPv6 is enabled on this host."""
if socket.has_ipv6:
sock = None
try:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind((HOSTv6, 0))
return True
except OSError:
pass
finally:
if sock:
sock.close()
return False
IPV6_ENABLED = _is_ipv6_enabled()
_bind_nix_socket_error = None
def skip_unless_bind_unix_socket(test):
"""Decorator for tests requiring a functional bind() for unix sockets."""
if not hasattr(socket, 'AF_UNIX'):
return unittest.skip('No UNIX Sockets')(test)
global _bind_nix_socket_error
if _bind_nix_socket_error is None:
from .os_helper import TESTFN, unlink
path = TESTFN + "can_bind_unix_socket"
with socket.socket(socket.AF_UNIX) as sock:
try:
sock.bind(path)
_bind_nix_socket_error = False
except OSError as e:
_bind_nix_socket_error = e
finally:
unlink(path)
if _bind_nix_socket_error:
msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
return unittest.skip(msg)(test)
else:
return test
def get_socket_conn_refused_errs():
"""
Get the different socket error numbers ('errno') which can be received
when a connection is refused.
"""
errors = [errno.ECONNREFUSED]
if hasattr(errno, 'ENETUNREACH'):
# On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
errors.append(errno.ENETUNREACH)
if hasattr(errno, 'EADDRNOTAVAIL'):
# bpo-31910: socket.create_connection() fails randomly
# with EADDRNOTAVAIL on Travis CI
errors.append(errno.EADDRNOTAVAIL)
if hasattr(errno, 'EHOSTUNREACH'):
# bpo-37583: The destination host cannot be reached
errors.append(errno.EHOSTUNREACH)
if not IPV6_ENABLED:
errors.append(errno.EAFNOSUPPORT)
return errors
_NOT_SET = object()
@contextlib.contextmanager
def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
"""Return a context manager that raises ResourceDenied when various issues
with the internet connection manifest themselves as exceptions."""
import urllib.error
if timeout is _NOT_SET:
timeout = support.INTERNET_TIMEOUT
default_errnos = [
('ECONNREFUSED', 111),
('ECONNRESET', 104),
('EHOSTUNREACH', 113),
('ENETUNREACH', 101),
('ETIMEDOUT', 110),
# socket.create_connection() fails randomly with
# EADDRNOTAVAIL on Travis CI.
('EADDRNOTAVAIL', 99),
]
default_gai_errnos = [
('EAI_AGAIN', -3),
('EAI_FAIL', -4),
('EAI_NONAME', -2),
('EAI_NODATA', -5),
# Encountered when trying to resolve IPv6-only hostnames
('WSANO_DATA', 11004),
]
denied = support.ResourceDenied("Resource %r is not available" % resource_name)
captured_errnos = errnos
gai_errnos = []
if not captured_errnos:
captured_errnos = [getattr(errno, name, num)
for (name, num) in default_errnos]
gai_errnos = [getattr(socket, name, num)
for (name, num) in default_gai_errnos]
def filter_error(err):
n = getattr(err, 'errno', None)
if (isinstance(err, TimeoutError) or
(isinstance(err, socket.gaierror) and n in gai_errnos) or
(isinstance(err, urllib.error.HTTPError) and
500 <= err.code <= 599) or
(isinstance(err, urllib.error.URLError) and
(("ConnectionRefusedError" in err.reason) or
("TimeoutError" in err.reason) or
("EOFError" in err.reason))) or
n in captured_errnos):
if not support.verbose:
sys.stderr.write(denied.args[0] + "\n")
raise denied from err
old_timeout = socket.getdefaulttimeout()
try:
if timeout is not None:
socket.setdefaulttimeout(timeout)
yield
except OSError as err:
# urllib can wrap original socket errors multiple times (!), we must
# unwrap to get at the original error.
while True:
a = err.args
if len(a) >= 1 and isinstance(a[0], OSError):
err = a[0]
# The error can also be wrapped as args[1]:
# except socket.error as msg:
# raise OSError('socket error', msg) from msg
elif len(a) >= 2 and isinstance(a[1], OSError):
err = a[1]
else:
break
filter_error(err)
raise
# XXX should we catch generic exceptions and look for their
# __cause__ or __context__?
finally:
socket.setdefaulttimeout(old_timeout)
def create_unix_domain_name():
"""
Create a UNIX domain name: socket.bind() argument of a AF_UNIX socket.
Return a path relative to the current directory to get a short path
(around 27 ASCII characters).
"""
return tempfile.mktemp(prefix="test_python_", suffix='.sock',
dir=os.path.curdir)
# consider that sysctl values should not change while tests are running
_sysctl_cache = {}
def _get_sysctl(name):
"""Get a sysctl value as an integer."""
try:
return _sysctl_cache[name]
except KeyError:
pass
# At least Linux and FreeBSD support the "-n" option
cmd = ['sysctl', '-n', name]
proc = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True)
if proc.returncode:
support.print_warning(f'{' '.join(cmd)!r} command failed with '
f'exit code {proc.returncode}')
# cache the error to only log the warning once
_sysctl_cache[name] = None
return None
output = proc.stdout
# Parse '0\n' to get '0'
try:
value = int(output.strip())
except Exception as exc:
support.print_warning(f'Failed to parse {' '.join(cmd)!r} '
f'command output {output!r}: {exc!r}')
# cache the error to only log the warning once
_sysctl_cache[name] = None
return None
_sysctl_cache[name] = value
return value
def tcp_blackhole():
if not sys.platform.startswith('freebsd'):
return False
# gh-109015: test if FreeBSD TCP blackhole is enabled
value = _get_sysctl('net.inet.tcp.blackhole')
if value is None:
# don't skip if we fail to get the sysctl value
return False
return (value != 0)
def skip_if_tcp_blackhole(test):
"""Decorator skipping test if TCP blackhole is enabled."""
skip_if = unittest.skipIf(
tcp_blackhole(),
"TCP blackhole is enabled (sysctl net.inet.tcp.blackhole)"
)
return skip_if(test)