import contextlib import logging import queue import time import unittest import sys import io from concurrent.futures._base import BrokenExecutor from concurrent.futures.process import _check_system_limits from logging.handlers import QueueHandler from test import support from .util import ExecutorMixin, create_executor_tests, setup_module INITIALIZER_STATUS = 'uninitialized' def init(x): global INITIALIZER_STATUS INITIALIZER_STATUS = x def get_init_status(): return INITIALIZER_STATUS def init_fail(log_queue=None): if log_queue is not None: logger = logging.getLogger('concurrent.futures') logger.addHandler(QueueHandler(log_queue)) logger.setLevel('CRITICAL') logger.propagate = False time.sleep(0.1) # let some futures be scheduled raise ValueError('error in initializer') class InitializerMixin(ExecutorMixin): worker_count = 2 def setUp(self): global INITIALIZER_STATUS INITIALIZER_STATUS = 'uninitialized' self.executor_kwargs = dict(initializer=init, initargs=('initialized',)) super().setUp() def test_initializer(self): futures = [self.executor.submit(get_init_status) for _ in range(self.worker_count)] for f in futures: self.assertEqual(f.result(), 'initialized') class FailingInitializerMixin(ExecutorMixin): worker_count = 2 def setUp(self): if hasattr(self, "ctx"): # Pass a queue to redirect the child's logging output self.mp_context = self.get_context() self.log_queue = self.mp_context.Queue() self.executor_kwargs = dict(initializer=init_fail, initargs=(self.log_queue,)) else: # In a thread pool, the child shares our logging setup # (see _assert_logged()) self.mp_context = None self.log_queue = None self.executor_kwargs = dict(initializer=init_fail) super().setUp() def test_initializer(self): with self._assert_logged('ValueError: error in initializer'): try: future = self.executor.submit(get_init_status) except BrokenExecutor: # Perhaps the executor is already broken pass else: with self.assertRaises(BrokenExecutor): future.result() # At some point, the executor should break for _ in support.sleeping_retry(support.SHORT_TIMEOUT, "executor not broken"): if self.executor._broken: break # ... and from this point submit() is guaranteed to fail with self.assertRaises(BrokenExecutor): self.executor.submit(get_init_status) @contextlib.contextmanager def _assert_logged(self, msg): if self.log_queue is not None: yield output = [] try: while True: output.append(self.log_queue.get_nowait().getMessage()) except queue.Empty: pass else: with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: yield output = cm.output self.assertTrue(any(msg in line for line in output), output) create_executor_tests(globals(), InitializerMixin) create_executor_tests(globals(), FailingInitializerMixin) @unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows") class FailingInitializerResourcesTest(unittest.TestCase): """ Source: https://github.com/python/cpython/issues/104090 """ def _test(self, test_class): try: _check_system_limits() except NotImplementedError: self.skipTest("ProcessPoolExecutor unavailable on this system") runner = unittest.TextTestRunner(stream=io.StringIO()) runner.run(test_class('test_initializer')) # GH-104090: # Stop resource tracker manually now, so we can verify there are not leaked resources by checking # the process exit code from multiprocessing.resource_tracker import _resource_tracker _resource_tracker._stop() self.assertEqual(_resource_tracker._exitcode, 0) def test_spawn(self): self._test(ProcessPoolSpawnFailingInitializerTest) @support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True) def test_forkserver(self): self._test(ProcessPoolForkserverFailingInitializerTest) def setUpModule(): setup_module() if __name__ == "__main__": unittest.main()