mirror of
https://github.com/python/cpython.git
synced 2024-11-21 21:09:37 +01:00
760872efec
There was a deadlock when `ProcessPoolExecutor` shuts down at the same time that a queueing thread handles an error processing a task. Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use an internal lock instead. This fixes the ordering deadlock where the `ExecutorManagerThread` holds the `_shutdown_lock` and joins the queueing thread, while the queueing thread is attempting to acquire the `_shutdown_lock` while closing the `_ThreadWakeup`.
346 lines
13 KiB
Python
346 lines
13 KiB
Python
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from concurrent import futures
|
|
|
|
from test import support
|
|
from test.support.script_helper import assert_python_ok
|
|
|
|
from .util import (
|
|
BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
|
|
ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
|
|
create_executor_tests, setup_module)
|
|
|
|
|
|
def sleep_and_print(t, msg):
|
|
time.sleep(t)
|
|
print(msg)
|
|
sys.stdout.flush()
|
|
|
|
|
|
class ExecutorShutdownTest:
|
|
def test_run_after_shutdown(self):
|
|
self.executor.shutdown()
|
|
self.assertRaises(RuntimeError,
|
|
self.executor.submit,
|
|
pow, 2, 5)
|
|
|
|
def test_interpreter_shutdown(self):
|
|
# Test the atexit hook for shutdown of worker threads and processes
|
|
rc, out, err = assert_python_ok('-c', """if 1:
|
|
from concurrent.futures import {executor_type}
|
|
from time import sleep
|
|
from test.test_concurrent_futures.test_shutdown import sleep_and_print
|
|
if __name__ == "__main__":
|
|
context = '{context}'
|
|
if context == "":
|
|
t = {executor_type}(5)
|
|
else:
|
|
from multiprocessing import get_context
|
|
context = get_context(context)
|
|
t = {executor_type}(5, mp_context=context)
|
|
t.submit(sleep_and_print, 1.0, "apple")
|
|
""".format(executor_type=self.executor_type.__name__,
|
|
context=getattr(self, "ctx", "")))
|
|
# Errors in atexit hooks don't change the process exit code, check
|
|
# stderr manually.
|
|
self.assertFalse(err)
|
|
self.assertEqual(out.strip(), b"apple")
|
|
|
|
def test_submit_after_interpreter_shutdown(self):
|
|
# Test the atexit hook for shutdown of worker threads and processes
|
|
rc, out, err = assert_python_ok('-c', """if 1:
|
|
import atexit
|
|
@atexit.register
|
|
def run_last():
|
|
try:
|
|
t.submit(id, None)
|
|
except RuntimeError:
|
|
print("runtime-error")
|
|
raise
|
|
from concurrent.futures import {executor_type}
|
|
if __name__ == "__main__":
|
|
context = '{context}'
|
|
if not context:
|
|
t = {executor_type}(5)
|
|
else:
|
|
from multiprocessing import get_context
|
|
context = get_context(context)
|
|
t = {executor_type}(5, mp_context=context)
|
|
t.submit(id, 42).result()
|
|
""".format(executor_type=self.executor_type.__name__,
|
|
context=getattr(self, "ctx", "")))
|
|
# Errors in atexit hooks don't change the process exit code, check
|
|
# stderr manually.
|
|
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
|
|
self.assertEqual(out.strip(), b"runtime-error")
|
|
|
|
def test_hang_issue12364(self):
|
|
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
|
|
self.executor.shutdown()
|
|
for f in fs:
|
|
f.result()
|
|
|
|
def test_cancel_futures(self):
|
|
assert self.worker_count <= 5, "test needs few workers"
|
|
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
|
|
self.executor.shutdown(cancel_futures=True)
|
|
# We can't guarantee the exact number of cancellations, but we can
|
|
# guarantee that *some* were cancelled. With few workers, many of
|
|
# the submitted futures should have been cancelled.
|
|
cancelled = [fut for fut in fs if fut.cancelled()]
|
|
self.assertGreater(len(cancelled), 20)
|
|
|
|
# Ensure the other futures were able to finish.
|
|
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
|
|
# that may have been left in a pending state.
|
|
others = [fut for fut in fs if not fut.cancelled()]
|
|
for fut in others:
|
|
self.assertTrue(fut.done(), msg=f"{fut._state=}")
|
|
self.assertIsNone(fut.exception())
|
|
|
|
# Similar to the number of cancelled futures, we can't guarantee the
|
|
# exact number that completed. But, we can guarantee that at least
|
|
# one finished.
|
|
self.assertGreater(len(others), 0)
|
|
|
|
def test_hang_gh83386(self):
|
|
"""shutdown(wait=False) doesn't hang at exit with running futures.
|
|
|
|
See https://github.com/python/cpython/issues/83386.
|
|
"""
|
|
if self.executor_type == futures.ProcessPoolExecutor:
|
|
raise unittest.SkipTest(
|
|
"Hangs, see https://github.com/python/cpython/issues/83386")
|
|
|
|
rc, out, err = assert_python_ok('-c', """if True:
|
|
from concurrent.futures import {executor_type}
|
|
from test.test_concurrent_futures.test_shutdown import sleep_and_print
|
|
if __name__ == "__main__":
|
|
if {context!r}: multiprocessing.set_start_method({context!r})
|
|
t = {executor_type}(max_workers=3)
|
|
t.submit(sleep_and_print, 1.0, "apple")
|
|
t.shutdown(wait=False)
|
|
""".format(executor_type=self.executor_type.__name__,
|
|
context=getattr(self, 'ctx', None)))
|
|
self.assertFalse(err)
|
|
self.assertEqual(out.strip(), b"apple")
|
|
|
|
def test_hang_gh94440(self):
|
|
"""shutdown(wait=True) doesn't hang when a future was submitted and
|
|
quickly canceled right before shutdown.
|
|
|
|
See https://github.com/python/cpython/issues/94440.
|
|
"""
|
|
if not hasattr(signal, 'alarm'):
|
|
raise unittest.SkipTest(
|
|
"Tested platform does not support the alarm signal")
|
|
|
|
def timeout(_signum, _frame):
|
|
raise RuntimeError("timed out waiting for shutdown")
|
|
|
|
kwargs = {}
|
|
if getattr(self, 'ctx', None):
|
|
kwargs['mp_context'] = self.get_context()
|
|
executor = self.executor_type(max_workers=1, **kwargs)
|
|
executor.submit(int).result()
|
|
old_handler = signal.signal(signal.SIGALRM, timeout)
|
|
try:
|
|
signal.alarm(5)
|
|
executor.submit(int).cancel()
|
|
executor.shutdown(wait=True)
|
|
finally:
|
|
signal.alarm(0)
|
|
signal.signal(signal.SIGALRM, old_handler)
|
|
|
|
|
|
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
|
|
def test_threads_terminate(self):
|
|
def acquire_lock(lock):
|
|
lock.acquire()
|
|
|
|
sem = threading.Semaphore(0)
|
|
for i in range(3):
|
|
self.executor.submit(acquire_lock, sem)
|
|
self.assertEqual(len(self.executor._threads), 3)
|
|
for i in range(3):
|
|
sem.release()
|
|
self.executor.shutdown()
|
|
for t in self.executor._threads:
|
|
t.join()
|
|
|
|
def test_context_manager_shutdown(self):
|
|
with futures.ThreadPoolExecutor(max_workers=5) as e:
|
|
executor = e
|
|
self.assertEqual(list(e.map(abs, range(-5, 5))),
|
|
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
|
|
|
|
for t in executor._threads:
|
|
t.join()
|
|
|
|
def test_del_shutdown(self):
|
|
executor = futures.ThreadPoolExecutor(max_workers=5)
|
|
res = executor.map(abs, range(-5, 5))
|
|
threads = executor._threads
|
|
del executor
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Make sure the results were all computed before the
|
|
# executor got shutdown.
|
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
|
|
|
def test_shutdown_no_wait(self):
|
|
# Ensure that the executor cleans up the threads when calling
|
|
# shutdown with wait=False
|
|
executor = futures.ThreadPoolExecutor(max_workers=5)
|
|
res = executor.map(abs, range(-5, 5))
|
|
threads = executor._threads
|
|
executor.shutdown(wait=False)
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Make sure the results were all computed before the
|
|
# executor got shutdown.
|
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
|
|
|
|
|
def test_thread_names_assigned(self):
|
|
executor = futures.ThreadPoolExecutor(
|
|
max_workers=5, thread_name_prefix='SpecialPool')
|
|
executor.map(abs, range(-5, 5))
|
|
threads = executor._threads
|
|
del executor
|
|
support.gc_collect() # For PyPy or other GCs.
|
|
|
|
for t in threads:
|
|
self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
|
|
t.join()
|
|
|
|
def test_thread_names_default(self):
|
|
executor = futures.ThreadPoolExecutor(max_workers=5)
|
|
executor.map(abs, range(-5, 5))
|
|
threads = executor._threads
|
|
del executor
|
|
support.gc_collect() # For PyPy or other GCs.
|
|
|
|
for t in threads:
|
|
# Ensure that our default name is reasonably sane and unique when
|
|
# no thread_name_prefix was supplied.
|
|
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
|
|
t.join()
|
|
|
|
def test_cancel_futures_wait_false(self):
|
|
# Can only be reliably tested for TPE, since PPE often hangs with
|
|
# `wait=False` (even without *cancel_futures*).
|
|
rc, out, err = assert_python_ok('-c', """if True:
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from test.test_concurrent_futures.test_shutdown import sleep_and_print
|
|
if __name__ == "__main__":
|
|
t = ThreadPoolExecutor()
|
|
t.submit(sleep_and_print, .1, "apple")
|
|
t.shutdown(wait=False, cancel_futures=True)
|
|
""")
|
|
# Errors in atexit hooks don't change the process exit code, check
|
|
# stderr manually.
|
|
self.assertFalse(err)
|
|
# gh-116682: stdout may be empty if shutdown happens before task
|
|
# starts executing.
|
|
self.assertIn(out.strip(), [b"apple", b""])
|
|
|
|
|
|
class ProcessPoolShutdownTest(ExecutorShutdownTest):
|
|
def test_processes_terminate(self):
|
|
def acquire_lock(lock):
|
|
lock.acquire()
|
|
|
|
mp_context = self.get_context()
|
|
if mp_context.get_start_method(allow_none=False) == "fork":
|
|
# fork pre-spawns, not on demand.
|
|
expected_num_processes = self.worker_count
|
|
else:
|
|
expected_num_processes = 3
|
|
|
|
sem = mp_context.Semaphore(0)
|
|
for _ in range(3):
|
|
self.executor.submit(acquire_lock, sem)
|
|
self.assertEqual(len(self.executor._processes), expected_num_processes)
|
|
for _ in range(3):
|
|
sem.release()
|
|
processes = self.executor._processes
|
|
self.executor.shutdown()
|
|
|
|
for p in processes.values():
|
|
p.join()
|
|
|
|
def test_context_manager_shutdown(self):
|
|
with futures.ProcessPoolExecutor(
|
|
max_workers=5, mp_context=self.get_context()) as e:
|
|
processes = e._processes
|
|
self.assertEqual(list(e.map(abs, range(-5, 5))),
|
|
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
|
|
|
|
for p in processes.values():
|
|
p.join()
|
|
|
|
def test_del_shutdown(self):
|
|
executor = futures.ProcessPoolExecutor(
|
|
max_workers=5, mp_context=self.get_context())
|
|
res = executor.map(abs, range(-5, 5))
|
|
executor_manager_thread = executor._executor_manager_thread
|
|
processes = executor._processes
|
|
call_queue = executor._call_queue
|
|
executor_manager_thread = executor._executor_manager_thread
|
|
del executor
|
|
support.gc_collect() # For PyPy or other GCs.
|
|
|
|
# Make sure that all the executor resources were properly cleaned by
|
|
# the shutdown process
|
|
executor_manager_thread.join()
|
|
for p in processes.values():
|
|
p.join()
|
|
call_queue.join_thread()
|
|
|
|
# Make sure the results were all computed before the
|
|
# executor got shutdown.
|
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
|
|
|
def test_shutdown_no_wait(self):
|
|
# Ensure that the executor cleans up the processes when calling
|
|
# shutdown with wait=False
|
|
executor = futures.ProcessPoolExecutor(
|
|
max_workers=5, mp_context=self.get_context())
|
|
res = executor.map(abs, range(-5, 5))
|
|
processes = executor._processes
|
|
call_queue = executor._call_queue
|
|
executor_manager_thread = executor._executor_manager_thread
|
|
executor.shutdown(wait=False)
|
|
|
|
# Make sure that all the executor resources were properly cleaned by
|
|
# the shutdown process
|
|
executor_manager_thread.join()
|
|
for p in processes.values():
|
|
p.join()
|
|
call_queue.join_thread()
|
|
|
|
# Make sure the results were all computed before the executor got
|
|
# shutdown.
|
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
|
|
|
|
|
create_executor_tests(globals(), ProcessPoolShutdownTest,
|
|
executor_mixins=(ProcessPoolForkMixin,
|
|
ProcessPoolForkserverMixin,
|
|
ProcessPoolSpawnMixin))
|
|
|
|
|
|
def setUpModule():
|
|
setup_module()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|