diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h index ac7ff83748d..38d0897ea13 100644 --- a/Include/cpython/pystate.h +++ b/Include/cpython/pystate.h @@ -161,32 +161,6 @@ struct _ts { */ uintptr_t critical_section; - /* Called when a thread state is deleted normally, but not when it - * is destroyed after fork(). - * Pain: to prevent rare but fatal shutdown errors (issue 18808), - * Thread.join() must wait for the join'ed thread's tstate to be unlinked - * from the tstate chain. That happens at the end of a thread's life, - * in pystate.c. - * The obvious way doesn't quite work: create a lock which the tstate - * unlinking code releases, and have Thread.join() wait to acquire that - * lock. The problem is that we _are_ at the end of the thread's life: - * if the thread holds the last reference to the lock, decref'ing the - * lock will delete the lock, and that may trigger arbitrary Python code - * if there's a weakref, with a callback, to the lock. But by this time - * _PyRuntime.gilstate.tstate_current is already NULL, so only the simplest - * of C code can be allowed to run (in particular it must not be possible to - * release the GIL). - * So instead of holding the lock directly, the tstate holds a weakref to - * the lock: that's the value of on_delete_data below. Decref'ing a - * weakref is harmless. - * on_delete points to _threadmodule.c's static release_sentinel() function. - * After the tstate is unlinked, release_sentinel is called with the - * weakref-to-lock (on_delete_data) argument, and release_sentinel releases - * the indirectly held lock. - */ - void (*on_delete)(void *); - void *on_delete_data; - int coroutine_origin_tracking_depth; PyObject *async_gen_firstiter; diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h index 971b4611a87..f993c95ecbf 100644 --- a/Include/internal/pycore_lock.h +++ b/Include/internal/pycore_lock.h @@ -153,16 +153,6 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt); // and 0 if the timeout expired or thread was interrupted. PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns); -// A one-time event notification with reference counting. -typedef struct _PyEventRc { - PyEvent event; - Py_ssize_t refcount; -} _PyEventRc; - -_PyEventRc *_PyEventRc_New(void); -void _PyEventRc_Incref(_PyEventRc *erc); -void _PyEventRc_Decref(_PyEventRc *erc); - // _PyRawMutex implements a word-sized mutex that that does not depend on the // parking lot API, and therefore can be used in the parking lot // implementation. diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index d2e7cc2a206..f032cb97388 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -78,7 +78,7 @@ struct _pythread_runtime_state { } stubs; #endif - // Linked list of ThreadHandleObjects + // Linked list of ThreadHandles struct llist_node handles; }; diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py index cd0a4e22648..c24c8213924 100644 --- a/Lib/test/test_audit.py +++ b/Lib/test/test_audit.py @@ -209,7 +209,7 @@ class AuditTest(unittest.TestCase): expected = [ ("_thread.start_new_thread", "(, (), None)"), ("test.test_func", "()"), - ("_thread.start_joinable_thread", "(,)"), + ("_thread.start_joinable_thread", "(, 1, None)"), ("test.test_func", "()"), ] diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 7fc59a05f3d..70444bb147f 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -201,13 +201,13 @@ class ProcessPoolExecutorTest(ExecutorTest): # QueueFeederThread. orig_start_new_thread = threading._start_joinable_thread nthread = 0 - def mock_start_new_thread(func, *args): + def mock_start_new_thread(func, *args, **kwargs): nonlocal nthread if nthread >= 1: raise RuntimeError("can't create new thread at " "interpreter shutdown") nthread += 1 - return orig_start_new_thread(func, *args) + return orig_start_new_thread(func, *args, **kwargs) with support.swap_attr(threading, '_start_joinable_thread', mock_start_new_thread): diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 83235230d5c..d94e04250c9 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -289,6 +289,54 @@ class ThreadRunningTests(BasicThreadTest): with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): raise error + def test_join_with_timeout(self): + lock = thread.allocate_lock() + lock.acquire() + + def thr(): + lock.acquire() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(thr) + handle.join(0.1) + self.assertFalse(handle.is_done()) + lock.release() + handle.join() + self.assertTrue(handle.is_done()) + + def test_join_unstarted(self): + handle = thread._ThreadHandle() + with self.assertRaisesRegex(RuntimeError, "thread not started"): + handle.join() + + def test_set_done_unstarted(self): + handle = thread._ThreadHandle() + with self.assertRaisesRegex(RuntimeError, "thread not started"): + handle._set_done() + + def test_start_duplicate_handle(self): + lock = thread.allocate_lock() + lock.acquire() + + def func(): + lock.acquire() + + handle = thread._ThreadHandle() + with threading_helper.wait_threads_exit(): + thread.start_joinable_thread(func, handle=handle) + with self.assertRaisesRegex(RuntimeError, "thread already started"): + thread.start_joinable_thread(func, handle=handle) + lock.release() + handle.join() + + def test_start_with_none_handle(self): + def func(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(func, handle=None) + handle.join() + class Barrier: def __init__(self, num_threads): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 886866626dc..f1dc12944cb 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -408,7 +408,7 @@ class ThreadTests(BaseTestCase): def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. - def fail_new_thread(*args): + def fail_new_thread(*args, **kwargs): raise threading.ThreadError() _start_joinable_thread = threading._start_joinable_thread threading._start_joinable_thread = fail_new_thread @@ -912,41 +912,6 @@ class ThreadTests(BaseTestCase): rc, out, err = assert_python_ok("-c", code) self.assertEqual(err, b"") - def test_tstate_lock(self): - # Test an implementation detail of Thread objects. - started = _thread.allocate_lock() - finish = _thread.allocate_lock() - started.acquire() - finish.acquire() - def f(): - started.release() - finish.acquire() - time.sleep(0.01) - # The tstate lock is None until the thread is started - t = threading.Thread(target=f) - self.assertIs(t._tstate_lock, None) - t.start() - started.acquire() - self.assertTrue(t.is_alive()) - # The tstate lock can't be acquired when the thread is running - # (or suspended). - tstate_lock = t._tstate_lock - self.assertFalse(tstate_lock.acquire(timeout=0), False) - finish.release() - # When the thread ends, the state_lock can be successfully - # acquired. - self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False) - # But is_alive() is still True: we hold _tstate_lock now, which - # prevents is_alive() from knowing the thread's end-of-life C code - # is done. - self.assertTrue(t.is_alive()) - # Let is_alive() find out the C code is done. - tstate_lock.release() - self.assertFalse(t.is_alive()) - # And verify the thread disposed of _tstate_lock. - self.assertIsNone(t._tstate_lock) - t.join() - def test_repr_stopped(self): # Verify that "stopped" shows up in repr(Thread) appropriately. started = _thread.allocate_lock() @@ -1112,30 +1077,6 @@ class ThreadTests(BaseTestCase): self.assertEqual(threading.getprofile(), old_profile) self.assertEqual(sys.getprofile(), old_profile) - @cpython_only - def test_shutdown_locks(self): - for daemon in (False, True): - with self.subTest(daemon=daemon): - event = threading.Event() - thread = threading.Thread(target=event.wait, daemon=daemon) - - # Thread.start() must add lock to _shutdown_locks, - # but only for non-daemon thread - thread.start() - tstate_lock = thread._tstate_lock - if not daemon: - self.assertIn(tstate_lock, threading._shutdown_locks) - else: - self.assertNotIn(tstate_lock, threading._shutdown_locks) - - # unblock the thread and join it - event.set() - thread.join() - - # Thread._stop() must remove tstate_lock from _shutdown_locks. - # Daemon threads must never add it to _shutdown_locks. - self.assertNotIn(tstate_lock, threading._shutdown_locks) - def test_locals_at_exit(self): # bpo-19466: thread locals must not be deleted before destructors # are called diff --git a/Lib/threading.py b/Lib/threading.py index ec89550d6b0..31ab77c92b1 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -36,8 +36,11 @@ _start_joinable_thread = _thread.start_joinable_thread _daemon_threads_allowed = _thread.daemon_threads_allowed _allocate_lock = _thread.allocate_lock _LockType = _thread.LockType -_set_sentinel = _thread._set_sentinel +_thread_shutdown = _thread._shutdown +_make_thread_handle = _thread._make_thread_handle +_ThreadHandle = _thread._ThreadHandle get_ident = _thread.get_ident +_get_main_thread_ident = _thread._get_main_thread_ident _is_main_interpreter = _thread._is_main_interpreter try: get_native_id = _thread.get_native_id @@ -847,25 +850,6 @@ _active = {} # maps thread id to Thread object _limbo = {} _dangling = WeakSet() -# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() -# to wait until all Python thread states get deleted: -# see Thread._set_tstate_lock(). -_shutdown_locks_lock = _allocate_lock() -_shutdown_locks = set() - -def _maintain_shutdown_locks(): - """ - Drop any shutdown locks that don't correspond to running threads anymore. - - Calling this from time to time avoids an ever-growing _shutdown_locks - set when Thread objects are not joined explicitly. See bpo-37788. - - This must be called with _shutdown_locks_lock acquired. - """ - # If a lock was released, the corresponding thread has exited - to_remove = [lock for lock in _shutdown_locks if not lock.locked()] - _shutdown_locks.difference_update(to_remove) - # Main class for threads @@ -930,10 +914,8 @@ class Thread: self._ident = None if _HAVE_THREAD_NATIVE_ID: self._native_id = None - self._tstate_lock = None - self._handle = None + self._handle = _ThreadHandle() self._started = Event() - self._is_stopped = False self._initialized = True # Copy of sys.stderr used by self._invoke_excepthook() self._stderr = _sys.stderr @@ -947,28 +929,18 @@ class Thread: if new_ident is not None: # This thread is alive. self._ident = new_ident - if self._handle is not None: - assert self._handle.ident == new_ident - # bpo-42350: If the fork happens when the thread is already stopped - # (ex: after threading._shutdown() has been called), _tstate_lock - # is None. Do nothing in this case. - if self._tstate_lock is not None: - self._tstate_lock._at_fork_reinit() - self._tstate_lock.acquire() + assert self._handle.ident == new_ident else: - # This thread isn't alive after fork: it doesn't have a tstate - # anymore. - self._is_stopped = True - self._tstate_lock = None - self._handle = None + # Otherwise, the thread is dead, Jim. _PyThread_AfterFork() + # already marked our handle done. + pass def __repr__(self): assert self._initialized, "Thread.__init__() was not called" status = "initial" if self._started.is_set(): status = "started" - self.is_alive() # easy way to get ._is_stopped set when appropriate - if self._is_stopped: + if self._handle.is_done(): status = "stopped" if self._daemonic: status += " daemon" @@ -996,7 +968,8 @@ class Thread: _limbo[self] = self try: # Start joinable thread - self._handle = _start_joinable_thread(self._bootstrap) + _start_joinable_thread(self._bootstrap, handle=self._handle, + daemon=self.daemon) except Exception: with _active_limbo_lock: del _limbo[self] @@ -1047,23 +1020,9 @@ class Thread: def _set_native_id(self): self._native_id = get_native_id() - def _set_tstate_lock(self): - """ - Set a lock object which will be released by the interpreter when - the underlying thread state (see pystate.h) gets deleted. - """ - self._tstate_lock = _set_sentinel() - self._tstate_lock.acquire() - - if not self.daemon: - with _shutdown_locks_lock: - _maintain_shutdown_locks() - _shutdown_locks.add(self._tstate_lock) - def _bootstrap_inner(self): try: self._set_ident() - self._set_tstate_lock() if _HAVE_THREAD_NATIVE_ID: self._set_native_id() self._started.set() @@ -1083,33 +1042,6 @@ class Thread: finally: self._delete() - def _stop(self): - # After calling ._stop(), .is_alive() returns False and .join() returns - # immediately. ._tstate_lock must be released before calling ._stop(). - # - # Normal case: C code at the end of the thread's life - # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and - # that's detected by our ._wait_for_tstate_lock(), called by .join() - # and .is_alive(). Any number of threads _may_ call ._stop() - # simultaneously (for example, if multiple threads are blocked in - # .join() calls), and they're not serialized. That's harmless - - # they'll just make redundant rebindings of ._is_stopped and - # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the - # "assert self._is_stopped" in ._wait_for_tstate_lock() always works - # (the assert is executed only if ._tstate_lock is None). - # - # Special case: _main_thread releases ._tstate_lock via this - # module's _shutdown() function. - lock = self._tstate_lock - if lock is not None: - assert not lock.locked() - self._is_stopped = True - self._tstate_lock = None - if not self.daemon: - with _shutdown_locks_lock: - # Remove our lock and other released locks from _shutdown_locks - _maintain_shutdown_locks() - def _delete(self): "Remove current thread from the dict of currently running threads." with _active_limbo_lock: @@ -1150,47 +1082,12 @@ class Thread: if self is current_thread(): raise RuntimeError("cannot join current thread") - if timeout is None: - self._wait_for_tstate_lock() - else: - # the behavior of a negative timeout isn't documented, but - # historically .join(timeout=x) for x<0 has acted as if timeout=0 - self._wait_for_tstate_lock(timeout=max(timeout, 0)) + # the behavior of a negative timeout isn't documented, but + # historically .join(timeout=x) for x<0 has acted as if timeout=0 + if timeout is not None: + timeout = max(timeout, 0) - if self._is_stopped: - self._join_os_thread() - - def _join_os_thread(self): - # self._handle may be cleared post-fork - if self._handle is not None: - self._handle.join() - - def _wait_for_tstate_lock(self, block=True, timeout=-1): - # Issue #18808: wait for the thread state to be gone. - # At the end of the thread's life, after all knowledge of the thread - # is removed from C data structures, C code releases our _tstate_lock. - # This method passes its arguments to _tstate_lock.acquire(). - # If the lock is acquired, the C code is done, and self._stop() is - # called. That sets ._is_stopped to True, and ._tstate_lock to None. - lock = self._tstate_lock - if lock is None: - # already determined that the C code is done - assert self._is_stopped - return - - try: - if lock.acquire(block, timeout): - lock.release() - self._stop() - except: - if lock.locked(): - # bpo-45274: lock.acquire() acquired the lock, but the function - # was interrupted with an exception before reaching the - # lock.release(). It can happen if a signal handler raises an - # exception, like CTRL+C which raises KeyboardInterrupt. - lock.release() - self._stop() - raise + self._handle.join(timeout) @property def name(self): @@ -1241,13 +1138,7 @@ class Thread: """ assert self._initialized, "Thread.__init__() not called" - if self._is_stopped or not self._started.is_set(): - return False - self._wait_for_tstate_lock(False) - if not self._is_stopped: - return True - self._join_os_thread() - return False + return self._started.is_set() and not self._handle.is_done() @property def daemon(self): @@ -1456,18 +1347,14 @@ class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread", daemon=False) - self._set_tstate_lock() self._started.set() - self._set_ident() + self._ident = _get_main_thread_ident() + self._handle = _make_thread_handle(self._ident) if _HAVE_THREAD_NATIVE_ID: self._set_native_id() with _active_limbo_lock: _active[self._ident] = self - def _join_os_thread(self): - # No ThreadHandle for main thread - pass - # Helper thread-local instance to detect when a _DummyThread # is collected. Not a part of the public API. @@ -1510,17 +1397,15 @@ class _DummyThread(Thread): daemon=_daemon_threads_allowed()) self._started.set() self._set_ident() + self._handle = _make_thread_handle(self._ident) if _HAVE_THREAD_NATIVE_ID: self._set_native_id() with _active_limbo_lock: _active[self._ident] = self _DeleteDummyThreadOnDel(self) - def _stop(self): - pass - def is_alive(self): - if not self._is_stopped and self._started.is_set(): + if not self._handle.is_done() and self._started.is_set(): return True raise RuntimeError("thread is not alive") @@ -1532,7 +1417,6 @@ class _DummyThread(Thread): self.__class__ = _MainThread self._name = 'MainThread' self._daemonic = False - self._set_tstate_lock() Thread._after_fork(self, new_ident=new_ident) @@ -1631,12 +1515,11 @@ def _shutdown(): """ Wait until the Python thread state of all non-daemon threads get deleted. """ - # Obscure: other threads may be waiting to join _main_thread. That's - # dubious, but some code does it. We can't wait for C code to release - # the main thread's tstate_lock - that won't happen until the interpreter - # is nearly dead. So we release it here. Note that just calling _stop() - # isn't enough: other threads may already be waiting on _tstate_lock. - if _main_thread._is_stopped and _is_main_interpreter(): + # Obscure: other threads may be waiting to join _main_thread. That's + # dubious, but some code does it. We can't wait for it to be marked as done + # normally - that won't happen until the interpreter is nearly dead. So + # mark it done here. + if _main_thread._handle.is_done() and _is_main_interpreter(): # _shutdown() was already called return @@ -1648,42 +1531,11 @@ def _shutdown(): for atexit_call in reversed(_threading_atexits): atexit_call() - # Main thread - if _main_thread.ident == get_ident(): - tlock = _main_thread._tstate_lock - # The main thread isn't finished yet, so its thread state lock can't - # have been released. - assert tlock is not None - if tlock.locked(): - # It should have been released already by - # _PyInterpreterState_SetNotRunningMain(), but there may be - # embedders that aren't calling that yet. - tlock.release() - _main_thread._stop() - else: - # bpo-1596321: _shutdown() must be called in the main thread. - # If the threading module was not imported by the main thread, - # _main_thread is the thread which imported the threading module. - # In this case, ignore _main_thread, similar behavior than for threads - # spawned by C libraries or using _thread.start_new_thread(). - pass + if _is_main_interpreter(): + _main_thread._handle._set_done() - # Join all non-deamon threads - while True: - with _shutdown_locks_lock: - locks = list(_shutdown_locks) - _shutdown_locks.clear() - - if not locks: - break - - for lock in locks: - # mimic Thread.join() - lock.acquire() - lock.release() - - # new threads can be spawned while we were waiting for the other - # threads to complete + # Wait for all non-daemon threads to exit. + _thread_shutdown() def main_thread(): @@ -1703,7 +1555,6 @@ def _after_fork(): # Reset _active_limbo_lock, in case we forked while the lock was held # by another (non-forked) thread. http://bugs.python.org/issue874900 global _active_limbo_lock, _main_thread - global _shutdown_locks_lock, _shutdown_locks _active_limbo_lock = RLock() # fork() only copied the current thread; clear references to others. @@ -1719,10 +1570,6 @@ def _after_fork(): _main_thread = current - # reset _shutdown() locks: threads re-register their _tstate_lock below - _shutdown_locks_lock = _allocate_lock() - _shutdown_locks = set() - with _active_limbo_lock: # Dangling thread instances must still have their locks reset, # because someone may join() them. @@ -1739,7 +1586,6 @@ def _after_fork(): else: # All the others are already stopped. thread._after_fork() - thread._stop() _limbo.clear() _active.clear() diff --git a/Misc/NEWS.d/next/Library/2024-02-01-03-09-38.gh-issue-114271.raCkt5.rst b/Misc/NEWS.d/next/Library/2024-02-01-03-09-38.gh-issue-114271.raCkt5.rst new file mode 100644 index 00000000000..2cd35eeda83 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-02-01-03-09-38.gh-issue-114271.raCkt5.rst @@ -0,0 +1,7 @@ +Fix a race in ``threading.Thread.join()``. + +``threading._MainThread`` now always represents the main thread of the main +interpreter. + +``PyThreadState.on_delete`` and ``PyThreadState.on_delete_data`` have been +removed. diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index cc5396a0350..6c84b7e08e2 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -12,7 +12,6 @@ #include "pycore_time.h" // _PyTime_FromSeconds() #include "pycore_weakref.h" // _PyWeakref_GET_REF() -#include #include // offsetof() #ifdef HAVE_SIGNAL_H # include // SIGINT @@ -21,7 +20,6 @@ // ThreadError is just an alias to PyExc_RuntimeError #define ThreadError PyExc_RuntimeError - // Forward declarations static struct PyModuleDef thread_module; @@ -32,6 +30,10 @@ typedef struct { PyTypeObject *local_type; PyTypeObject *local_dummy_type; PyTypeObject *thread_handle_type; + + // Linked list of handles to all non-daemon threads created by the + // threading module. We wait for these to finish at shutdown. + struct llist_node shutdown_handles; } thread_module_state; static inline thread_module_state* @@ -44,76 +46,148 @@ get_thread_state(PyObject *module) // _ThreadHandle type -// Handles transition from RUNNING to one of JOINED, DETACHED, or INVALID (post -// fork). +// Handles state transitions according to the following diagram: +// +// NOT_STARTED -> STARTING -> RUNNING -> DONE +// | ^ +// | | +// +----- error --------+ typedef enum { - THREAD_HANDLE_RUNNING = 1, - THREAD_HANDLE_JOINED = 2, - THREAD_HANDLE_DETACHED = 3, - THREAD_HANDLE_INVALID = 4, + THREAD_HANDLE_NOT_STARTED = 1, + THREAD_HANDLE_STARTING = 2, + THREAD_HANDLE_RUNNING = 3, + THREAD_HANDLE_DONE = 4, } ThreadHandleState; -// A handle around an OS thread. +// A handle to wait for thread completion. // -// The OS thread is either joined or detached after the handle is destroyed. +// This may be used to wait for threads that were spawned by the threading +// module as well as for the "main" thread of the threading module. In the +// former case an OS thread, identified by the `os_handle` field, will be +// associated with the handle. The handle "owns" this thread and ensures that +// the thread is either joined or detached after the handle is destroyed. // -// Joining the handle is idempotent; the underlying OS thread is joined or -// detached only once. Concurrent join operations are serialized until it is -// their turn to execute or an earlier operation completes successfully. Once a -// join has completed successfully all future joins complete immediately. +// Joining the handle is idempotent; the underlying OS thread, if any, is +// joined or detached only once. Concurrent join operations are serialized +// until it is their turn to execute or an earlier operation completes +// successfully. Once a join has completed successfully all future joins +// complete immediately. +// +// This must be separately reference counted because it may be destroyed +// in `thread_run()` after the PyThreadState has been destroyed. typedef struct { - PyObject_HEAD struct llist_node node; // linked list node (see _pythread_runtime_state) - // The `ident` and `handle` fields are immutable once the object is visible - // to threads other than its creator, thus they do not need to be accessed - // atomically. + // linked list node (see thread_module_state) + struct llist_node shutdown_node; + + // The `ident`, `os_handle`, `has_os_handle`, and `state` fields are + // protected by `mutex`. PyThread_ident_t ident; - PyThread_handle_t handle; + PyThread_handle_t os_handle; + int has_os_handle; // Holds a value from the `ThreadHandleState` enum. int state; + PyMutex mutex; + // Set immediately before `thread_run` returns to indicate that the OS // thread is about to exit. This is used to avoid false positives when // detecting self-join attempts. See the comment in `ThreadHandle_join()` // for a more detailed explanation. - _PyEventRc *thread_is_exiting; + PyEvent thread_is_exiting; - // Serializes calls to `join`. + // Serializes calls to `join` and `set_done`. _PyOnceFlag once; -} ThreadHandleObject; + + Py_ssize_t refcount; +} ThreadHandle; static inline int -get_thread_handle_state(ThreadHandleObject *handle) +get_thread_handle_state(ThreadHandle *handle) { - return _Py_atomic_load_int(&handle->state); + PyMutex_Lock(&handle->mutex); + int state = handle->state; + PyMutex_Unlock(&handle->mutex); + return state; } static inline void -set_thread_handle_state(ThreadHandleObject *handle, ThreadHandleState state) +set_thread_handle_state(ThreadHandle *handle, ThreadHandleState state) { - _Py_atomic_store_int(&handle->state, state); + PyMutex_Lock(&handle->mutex); + handle->state = state; + PyMutex_Unlock(&handle->mutex); } -static ThreadHandleObject* -new_thread_handle(thread_module_state* state) +static PyThread_ident_t +ThreadHandle_ident(ThreadHandle *handle) { - _PyEventRc *event = _PyEventRc_New(); - if (event == NULL) { + PyMutex_Lock(&handle->mutex); + PyThread_ident_t ident = handle->ident; + PyMutex_Unlock(&handle->mutex); + return ident; +} + +static int +ThreadHandle_get_os_handle(ThreadHandle *handle, PyThread_handle_t *os_handle) +{ + PyMutex_Lock(&handle->mutex); + int has_os_handle = handle->has_os_handle; + if (has_os_handle) { + *os_handle = handle->os_handle; + } + PyMutex_Unlock(&handle->mutex); + return has_os_handle; +} + +static void +add_to_shutdown_handles(thread_module_state *state, ThreadHandle *handle) +{ + HEAD_LOCK(&_PyRuntime); + llist_insert_tail(&state->shutdown_handles, &handle->shutdown_node); + HEAD_UNLOCK(&_PyRuntime); +} + +static void +clear_shutdown_handles(thread_module_state *state) +{ + HEAD_LOCK(&_PyRuntime); + struct llist_node *node; + llist_for_each_safe(node, &state->shutdown_handles) { + llist_remove(node); + } + HEAD_UNLOCK(&_PyRuntime); +} + +static void +remove_from_shutdown_handles(ThreadHandle *handle) +{ + HEAD_LOCK(&_PyRuntime); + if (handle->shutdown_node.next != NULL) { + llist_remove(&handle->shutdown_node); + } + HEAD_UNLOCK(&_PyRuntime); +} + +static ThreadHandle * +ThreadHandle_new(void) +{ + ThreadHandle *self = + (ThreadHandle *)PyMem_RawCalloc(1, sizeof(ThreadHandle)); + if (self == NULL) { PyErr_NoMemory(); return NULL; } - ThreadHandleObject* self = PyObject_New(ThreadHandleObject, state->thread_handle_type); - if (self == NULL) { - _PyEventRc_Decref(event); - return NULL; - } self->ident = 0; - self->handle = 0; - self->thread_is_exiting = event; + self->os_handle = 0; + self->has_os_handle = 0; + self->thread_is_exiting = (PyEvent){0}; + self->mutex = (PyMutex){_Py_UNLOCKED}; self->once = (_PyOnceFlag){0}; - self->state = THREAD_HANDLE_INVALID; + self->state = THREAD_HANDLE_NOT_STARTED; + self->refcount = 1; HEAD_LOCK(&_PyRuntime); llist_insert_tail(&_PyRuntime.threads.handles, &self->node); @@ -123,9 +197,34 @@ new_thread_handle(thread_module_state* state) } static void -ThreadHandle_dealloc(ThreadHandleObject *self) +ThreadHandle_incref(ThreadHandle *self) { - PyObject *tp = (PyObject *) Py_TYPE(self); + _Py_atomic_add_ssize(&self->refcount, 1); +} + +static int +detach_thread(ThreadHandle *self) +{ + if (!self->has_os_handle) { + return 0; + } + // This is typically short so no need to release the GIL + if (PyThread_detach_thread(self->os_handle)) { + fprintf(stderr, "detach_thread: failed detaching thread\n"); + return -1; + } + return 0; +} + +// NB: This may be called after the PyThreadState in `thread_run` has been +// deleted; it cannot call anything that relies on a valid PyThreadState +// existing. +static void +ThreadHandle_decref(ThreadHandle *self) +{ + if (_Py_atomic_add_ssize(&self->refcount, -1) > 1) { + return; + } // Remove ourself from the global list of handles HEAD_LOCK(&_PyRuntime); @@ -134,23 +233,17 @@ ThreadHandle_dealloc(ThreadHandleObject *self) } HEAD_UNLOCK(&_PyRuntime); + assert(self->shutdown_node.next == NULL); + // It's safe to access state non-atomically: // 1. This is the destructor; nothing else holds a reference. - // 2. The refcount going to zero is a "synchronizes-with" event; - // all changes from other threads are visible. - if (self->state == THREAD_HANDLE_RUNNING) { - // This is typically short so no need to release the GIL - if (PyThread_detach_thread(self->handle)) { - PyErr_SetString(ThreadError, "Failed detaching thread"); - PyErr_WriteUnraisable(tp); - } - else { - self->state = THREAD_HANDLE_DETACHED; - } + // 2. The refcount going to zero is a "synchronizes-with" event; all + // changes from other threads are visible. + if (self->state == THREAD_HANDLE_RUNNING && !detach_thread(self)) { + self->state = THREAD_HANDLE_DONE; } - _PyEventRc_Decref(self->thread_is_exiting); - PyObject_Free(self); - Py_DECREF(tp); + + PyMem_RawFree(self); } void @@ -164,55 +257,229 @@ _PyThread_AfterFork(struct _pythread_runtime_state *state) struct llist_node *node; llist_for_each_safe(node, &state->handles) { - ThreadHandleObject *hobj = llist_data(node, ThreadHandleObject, node); - if (hobj->ident == current) { + ThreadHandle *handle = llist_data(node, ThreadHandle, node); + if (handle->ident == current) { continue; } - // Disallow calls to join() as they could crash. We are the only - // thread; it's safe to set this without an atomic. - hobj->state = THREAD_HANDLE_INVALID; + // Mark all threads as done. Any attempts to join or detach the + // underlying OS thread (if any) could crash. We are the only thread; + // it's safe to set this non-atomically. + handle->state = THREAD_HANDLE_DONE; + handle->once = (_PyOnceFlag){_Py_ONCE_INITIALIZED}; + handle->mutex = (PyMutex){_Py_UNLOCKED}; + _PyEvent_Notify(&handle->thread_is_exiting); llist_remove(node); + remove_from_shutdown_handles(handle); } } -static PyObject * -ThreadHandle_repr(ThreadHandleObject *self) +// bootstate is used to "bootstrap" new threads. Any arguments needed by +// `thread_run()`, which can only take a single argument due to platform +// limitations, are contained in bootstate. +struct bootstate { + PyThreadState *tstate; + PyObject *func; + PyObject *args; + PyObject *kwargs; + ThreadHandle *handle; + PyEvent handle_ready; +}; + +static void +thread_bootstate_free(struct bootstate *boot, int decref) { - return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">", - Py_TYPE(self)->tp_name, self->ident); + if (decref) { + Py_DECREF(boot->func); + Py_DECREF(boot->args); + Py_XDECREF(boot->kwargs); + } + ThreadHandle_decref(boot->handle); + PyMem_RawFree(boot); } -static PyObject * -ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored) +static void +thread_run(void *boot_raw) { - return PyLong_FromUnsignedLongLong(self->ident); + struct bootstate *boot = (struct bootstate *) boot_raw; + PyThreadState *tstate = boot->tstate; + + // Wait until the handle is marked as running + PyEvent_Wait(&boot->handle_ready); + + // `handle` needs to be manipulated after bootstate has been freed + ThreadHandle *handle = boot->handle; + ThreadHandle_incref(handle); + + // gh-108987: If _thread.start_new_thread() is called before or while + // Python is being finalized, thread_run() can called *after*. + // _PyRuntimeState_SetFinalizing() is called. At this point, all Python + // threads must exit, except of the thread calling Py_Finalize() whch holds + // the GIL and must not exit. + // + // At this stage, tstate can be a dangling pointer (point to freed memory), + // it's ok to call _PyThreadState_MustExit() with a dangling pointer. + if (_PyThreadState_MustExit(tstate)) { + // Don't call PyThreadState_Clear() nor _PyThreadState_DeleteCurrent(). + // These functions are called on tstate indirectly by Py_Finalize() + // which calls _PyInterpreterState_Clear(). + // + // Py_DECREF() cannot be called because the GIL is not held: leak + // references on purpose. Python is being finalized anyway. + thread_bootstate_free(boot, 0); + goto exit; + } + + _PyThreadState_Bind(tstate); + PyEval_AcquireThread(tstate); + _Py_atomic_add_ssize(&tstate->interp->threads.count, 1); + + PyObject *res = PyObject_Call(boot->func, boot->args, boot->kwargs); + if (res == NULL) { + if (PyErr_ExceptionMatches(PyExc_SystemExit)) + /* SystemExit is ignored silently */ + PyErr_Clear(); + else { + PyErr_FormatUnraisable( + "Exception ignored in thread started by %R", boot->func); + } + } + else { + Py_DECREF(res); + } + + thread_bootstate_free(boot, 1); + + _Py_atomic_add_ssize(&tstate->interp->threads.count, -1); + PyThreadState_Clear(tstate); + _PyThreadState_DeleteCurrent(tstate); + +exit: + // Don't need to wait for this thread anymore + remove_from_shutdown_handles(handle); + + _PyEvent_Notify(&handle->thread_is_exiting); + ThreadHandle_decref(handle); + + // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with + // the glibc, pthread_exit() can abort the whole process if dlopen() fails + // to open the libgcc_s.so library (ex: EMFILE error). + return; } static int -join_thread(ThreadHandleObject *handle) +force_done(ThreadHandle *handle) { - assert(get_thread_handle_state(handle) == THREAD_HANDLE_RUNNING); - - int err; - Py_BEGIN_ALLOW_THREADS - err = PyThread_join_thread(handle->handle); - Py_END_ALLOW_THREADS - if (err) { - PyErr_SetString(ThreadError, "Failed joining thread"); - return -1; - } - set_thread_handle_state(handle, THREAD_HANDLE_JOINED); + assert(get_thread_handle_state(handle) == THREAD_HANDLE_STARTING); + _PyEvent_Notify(&handle->thread_is_exiting); + set_thread_handle_state(handle, THREAD_HANDLE_DONE); return 0; } -static PyObject * -ThreadHandle_join(ThreadHandleObject *self, void* ignored) +static int +ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args, + PyObject *kwargs) { - if (get_thread_handle_state(self) == THREAD_HANDLE_INVALID) { - PyErr_SetString(PyExc_ValueError, - "the handle is invalid and thus cannot be joined"); - return NULL; + // Mark the handle as starting to prevent any other threads from doing so + PyMutex_Lock(&self->mutex); + if (self->state != THREAD_HANDLE_NOT_STARTED) { + PyMutex_Unlock(&self->mutex); + PyErr_SetString(ThreadError, "thread already started"); + return -1; + } + self->state = THREAD_HANDLE_STARTING; + PyMutex_Unlock(&self->mutex); + + // Do all the heavy lifting outside of the mutex. All other operations on + // the handle should fail since the handle is in the starting state. + + // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(), + // because it should be possible to call thread_bootstate_free() + // without holding the GIL. + struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate)); + if (boot == NULL) { + PyErr_NoMemory(); + goto start_failed; + } + PyInterpreterState *interp = _PyInterpreterState_GET(); + boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING); + if (boot->tstate == NULL) { + PyMem_RawFree(boot); + if (!PyErr_Occurred()) { + PyErr_NoMemory(); + } + goto start_failed; + } + boot->func = Py_NewRef(func); + boot->args = Py_NewRef(args); + boot->kwargs = Py_XNewRef(kwargs); + boot->handle = self; + ThreadHandle_incref(self); + boot->handle_ready = (PyEvent){0}; + + PyThread_ident_t ident; + PyThread_handle_t os_handle; + if (PyThread_start_joinable_thread(thread_run, boot, &ident, &os_handle)) { + PyThreadState_Clear(boot->tstate); + thread_bootstate_free(boot, 1); + PyErr_SetString(ThreadError, "can't start new thread"); + goto start_failed; + } + + // Mark the handle running + PyMutex_Lock(&self->mutex); + assert(self->state == THREAD_HANDLE_STARTING); + self->ident = ident; + self->has_os_handle = 1; + self->os_handle = os_handle; + self->state = THREAD_HANDLE_RUNNING; + PyMutex_Unlock(&self->mutex); + + // Unblock the thread + _PyEvent_Notify(&boot->handle_ready); + + return 0; + +start_failed: + _PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)force_done, self); + return -1; +} + +static int +join_thread(ThreadHandle *handle) +{ + assert(get_thread_handle_state(handle) == THREAD_HANDLE_RUNNING); + PyThread_handle_t os_handle; + if (ThreadHandle_get_os_handle(handle, &os_handle)) { + int err = 0; + Py_BEGIN_ALLOW_THREADS + err = PyThread_join_thread(os_handle); + Py_END_ALLOW_THREADS + if (err) { + PyErr_SetString(ThreadError, "Failed joining thread"); + return -1; + } + } + set_thread_handle_state(handle, THREAD_HANDLE_DONE); + return 0; +} + +static int +check_started(ThreadHandle *self) +{ + ThreadHandleState state = get_thread_handle_state(self); + if (state < THREAD_HANDLE_RUNNING) { + PyErr_SetString(ThreadError, "thread not started"); + return -1; + } + return 0; +} + +static int +ThreadHandle_join(ThreadHandle *self, PyTime_t timeout_ns) +{ + if (check_started(self) < 0) { + return -1; } // We want to perform this check outside of the `_PyOnceFlag` to prevent @@ -225,45 +492,197 @@ ThreadHandle_join(ThreadHandleObject *self, void* ignored) // To work around this, we set `thread_is_exiting` immediately before // `thread_run` returns. We can be sure that we are not attempting to join // ourselves if the handle's thread is about to exit. - if (!_PyEvent_IsSet(&self->thread_is_exiting->event) && - self->ident == PyThread_get_thread_ident_ex()) { + if (!_PyEvent_IsSet(&self->thread_is_exiting) && + ThreadHandle_ident(self) == PyThread_get_thread_ident_ex()) { // PyThread_join_thread() would deadlock or error out. PyErr_SetString(ThreadError, "Cannot join current thread"); - return NULL; + return -1; + } + + // Wait until the deadline for the thread to exit. + PyTime_t deadline = timeout_ns != -1 ? _PyDeadline_Init(timeout_ns) : 0; + while (!PyEvent_WaitTimed(&self->thread_is_exiting, timeout_ns)) { + if (deadline) { + // _PyDeadline_Get will return a negative value if the deadline has + // been exceeded. + timeout_ns = Py_MAX(_PyDeadline_Get(deadline), 0); + } + + if (timeout_ns) { + // Interrupted + if (Py_MakePendingCalls() < 0) { + return -1; + } + } + else { + // Timed out + return 0; + } } if (_PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)join_thread, self) == -1) { + return -1; + } + assert(get_thread_handle_state(self) == THREAD_HANDLE_DONE); + return 0; +} + +static int +set_done(ThreadHandle *handle) +{ + assert(get_thread_handle_state(handle) == THREAD_HANDLE_RUNNING); + if (detach_thread(handle) < 0) { + PyErr_SetString(ThreadError, "failed detaching handle"); + return -1; + } + _PyEvent_Notify(&handle->thread_is_exiting); + set_thread_handle_state(handle, THREAD_HANDLE_DONE); + return 0; +} + +static int +ThreadHandle_set_done(ThreadHandle *self) +{ + if (check_started(self) < 0) { + return -1; + } + + if (_PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)set_done, self) == + -1) { + return -1; + } + assert(get_thread_handle_state(self) == THREAD_HANDLE_DONE); + return 0; +} + +// A wrapper around a ThreadHandle. +typedef struct { + PyObject_HEAD + + ThreadHandle *handle; +} PyThreadHandleObject; + +static PyThreadHandleObject * +PyThreadHandleObject_new(PyTypeObject *type) +{ + ThreadHandle *handle = ThreadHandle_new(); + if (handle == NULL) { + return NULL; + } + + PyThreadHandleObject *self = + (PyThreadHandleObject *)type->tp_alloc(type, 0); + if (self == NULL) { + ThreadHandle_decref(handle); + return NULL; + } + + self->handle = handle; + + return self; +} + +static PyObject * +PyThreadHandleObject_tp_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + return (PyObject *)PyThreadHandleObject_new(type); +} + +static void +PyThreadHandleObject_dealloc(PyThreadHandleObject *self) +{ + PyObject *tp = (PyObject *) Py_TYPE(self); + ThreadHandle_decref(self->handle); + PyObject_Free(self); + Py_DECREF(tp); +} + +static PyObject * +PyThreadHandleObject_repr(PyThreadHandleObject *self) +{ + PyThread_ident_t ident = ThreadHandle_ident(self->handle); + return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">", + Py_TYPE(self)->tp_name, ident); +} + +static PyObject * +PyThreadHandleObject_get_ident(PyThreadHandleObject *self, + PyObject *Py_UNUSED(ignored)) +{ + return PyLong_FromUnsignedLongLong(ThreadHandle_ident(self->handle)); +} + +static PyObject * +PyThreadHandleObject_join(PyThreadHandleObject *self, PyObject *args) +{ + PyObject *timeout_obj = NULL; + if (!PyArg_ParseTuple(args, "|O:join", &timeout_obj)) { + return NULL; + } + + PyTime_t timeout_ns = -1; + if (timeout_obj != NULL && timeout_obj != Py_None) { + if (_PyTime_FromSecondsObject(&timeout_ns, timeout_obj, + _PyTime_ROUND_TIMEOUT) < 0) { + return NULL; + } + } + + if (ThreadHandle_join(self->handle, timeout_ns) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + +static PyObject * +PyThreadHandleObject_is_done(PyThreadHandleObject *self, + PyObject *Py_UNUSED(ignored)) +{ + if (_PyEvent_IsSet(&self->handle->thread_is_exiting)) { + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + +static PyObject * +PyThreadHandleObject_set_done(PyThreadHandleObject *self, + PyObject *Py_UNUSED(ignored)) +{ + if (ThreadHandle_set_done(self->handle) < 0) { return NULL; } - assert(get_thread_handle_state(self) == THREAD_HANDLE_JOINED); Py_RETURN_NONE; } static PyGetSetDef ThreadHandle_getsetlist[] = { - {"ident", (getter)ThreadHandle_get_ident, NULL, NULL}, + {"ident", (getter)PyThreadHandleObject_get_ident, NULL, NULL}, {0}, }; -static PyMethodDef ThreadHandle_methods[] = -{ - {"join", (PyCFunction)ThreadHandle_join, METH_NOARGS}, +static PyMethodDef ThreadHandle_methods[] = { + {"join", (PyCFunction)PyThreadHandleObject_join, METH_VARARGS, NULL}, + {"_set_done", (PyCFunction)PyThreadHandleObject_set_done, METH_NOARGS, NULL}, + {"is_done", (PyCFunction)PyThreadHandleObject_is_done, METH_NOARGS, NULL}, {0, 0} }; static PyType_Slot ThreadHandle_Type_slots[] = { - {Py_tp_dealloc, (destructor)ThreadHandle_dealloc}, - {Py_tp_repr, (reprfunc)ThreadHandle_repr}, + {Py_tp_dealloc, (destructor)PyThreadHandleObject_dealloc}, + {Py_tp_repr, (reprfunc)PyThreadHandleObject_repr}, {Py_tp_getset, ThreadHandle_getsetlist}, {Py_tp_methods, ThreadHandle_methods}, + {Py_tp_new, PyThreadHandleObject_tp_new}, {0, 0} }; static PyType_Spec ThreadHandle_Type_spec = { "_thread._ThreadHandle", - sizeof(ThreadHandleObject), + sizeof(PyThreadHandleObject), 0, - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_DISALLOW_INSTANTIATION, + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_IMMUTABLETYPE, ThreadHandle_Type_slots, }; @@ -1272,98 +1691,6 @@ _localdummy_destroyed(PyObject *localweakref, PyObject *dummyweakref) /* Module functions */ -// bootstate is used to "bootstrap" new threads. Any arguments needed by -// `thread_run()`, which can only take a single argument due to platform -// limitations, are contained in bootstate. -struct bootstate { - PyThreadState *tstate; - PyObject *func; - PyObject *args; - PyObject *kwargs; - _PyEventRc *thread_is_exiting; -}; - - -static void -thread_bootstate_free(struct bootstate *boot, int decref) -{ - if (decref) { - Py_DECREF(boot->func); - Py_DECREF(boot->args); - Py_XDECREF(boot->kwargs); - } - if (boot->thread_is_exiting != NULL) { - _PyEventRc_Decref(boot->thread_is_exiting); - } - PyMem_RawFree(boot); -} - - -static void -thread_run(void *boot_raw) -{ - struct bootstate *boot = (struct bootstate *) boot_raw; - PyThreadState *tstate = boot->tstate; - - // `thread_is_exiting` needs to be set after bootstate has been freed - _PyEventRc *thread_is_exiting = boot->thread_is_exiting; - boot->thread_is_exiting = NULL; - - // gh-108987: If _thread.start_new_thread() is called before or while - // Python is being finalized, thread_run() can called *after*. - // _PyRuntimeState_SetFinalizing() is called. At this point, all Python - // threads must exit, except of the thread calling Py_Finalize() whch holds - // the GIL and must not exit. - // - // At this stage, tstate can be a dangling pointer (point to freed memory), - // it's ok to call _PyThreadState_MustExit() with a dangling pointer. - if (_PyThreadState_MustExit(tstate)) { - // Don't call PyThreadState_Clear() nor _PyThreadState_DeleteCurrent(). - // These functions are called on tstate indirectly by Py_Finalize() - // which calls _PyInterpreterState_Clear(). - // - // Py_DECREF() cannot be called because the GIL is not held: leak - // references on purpose. Python is being finalized anyway. - thread_bootstate_free(boot, 0); - goto exit; - } - - _PyThreadState_Bind(tstate); - PyEval_AcquireThread(tstate); - _Py_atomic_add_ssize(&tstate->interp->threads.count, 1); - - PyObject *res = PyObject_Call(boot->func, boot->args, boot->kwargs); - if (res == NULL) { - if (PyErr_ExceptionMatches(PyExc_SystemExit)) - /* SystemExit is ignored silently */ - PyErr_Clear(); - else { - PyErr_FormatUnraisable( - "Exception ignored in thread started by %R", boot->func); - } - } - else { - Py_DECREF(res); - } - - thread_bootstate_free(boot, 1); - - _Py_atomic_add_ssize(&tstate->interp->threads.count, -1); - PyThreadState_Clear(tstate); - _PyThreadState_DeleteCurrent(tstate); - -exit: - if (thread_is_exiting != NULL) { - _PyEvent_Notify(&thread_is_exiting->event); - _PyEventRc_Decref(thread_is_exiting); - } - - // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with - // the glibc, pthread_exit() can abort the whole process if dlopen() fails - // to open the libgcc_s.so library (ex: EMFILE error). - return; -} - static PyObject * thread_daemon_threads_allowed(PyObject *module, PyObject *Py_UNUSED(ignored)) { @@ -1383,11 +1710,8 @@ Return True if daemon threads are allowed in the current interpreter,\n\ and False otherwise.\n"); static int -do_start_new_thread(thread_module_state* state, - PyObject *func, PyObject* args, PyObject* kwargs, - int joinable, - PyThread_ident_t* ident, PyThread_handle_t* handle, - _PyEventRc *thread_is_exiting) +do_start_new_thread(thread_module_state *state, PyObject *func, PyObject *args, + PyObject *kwargs, ThreadHandle *handle, int daemon) { PyInterpreterState *interp = _PyInterpreterState_GET(); if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) { @@ -1401,44 +1725,20 @@ do_start_new_thread(thread_module_state* state, return -1; } - // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(), - // because it should be possible to call thread_bootstate_free() - // without holding the GIL. - struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate)); - if (boot == NULL) { - PyErr_NoMemory(); - return -1; + if (!daemon) { + // Add the handle before starting the thread to avoid adding a handle + // to a thread that has already finished (i.e. if the thread finishes + // before the call to `ThreadHandle_start()` below returns). + add_to_shutdown_handles(state, handle); } - boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING); - if (boot->tstate == NULL) { - PyMem_RawFree(boot); - if (!PyErr_Occurred()) { - PyErr_NoMemory(); + + if (ThreadHandle_start(handle, func, args, kwargs) < 0) { + if (!daemon) { + remove_from_shutdown_handles(handle); } return -1; } - boot->func = Py_NewRef(func); - boot->args = Py_NewRef(args); - boot->kwargs = Py_XNewRef(kwargs); - boot->thread_is_exiting = thread_is_exiting; - if (thread_is_exiting != NULL) { - _PyEventRc_Incref(thread_is_exiting); - } - int err; - if (joinable) { - err = PyThread_start_joinable_thread(thread_run, (void*) boot, ident, handle); - } else { - *handle = 0; - *ident = PyThread_start_new_thread(thread_run, (void*) boot); - err = (*ident == PYTHREAD_INVALID_THREAD_ID); - } - if (err) { - PyErr_SetString(ThreadError, "can't start new thread"); - PyThreadState_Clear(boot->tstate); - thread_bootstate_free(boot, 1); - return -1; - } return 0; } @@ -1472,12 +1772,19 @@ thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) return NULL; } - PyThread_ident_t ident = 0; - PyThread_handle_t handle; - if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0, - &ident, &handle, NULL)) { + ThreadHandle *handle = ThreadHandle_new(); + if (handle == NULL) { return NULL; } + + int st = + do_start_new_thread(state, func, args, kwargs, handle, /*daemon=*/1); + if (st < 0) { + ThreadHandle_decref(handle); + return NULL; + } + PyThread_ident_t ident = ThreadHandle_ident(handle); + ThreadHandle_decref(handle); return PyLong_FromUnsignedLongLong(ident); } @@ -1495,9 +1802,19 @@ unhandled exception; a stack trace will be printed unless the exception\n\ is SystemExit.\n"); static PyObject * -thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func) +thread_PyThread_start_joinable_thread(PyObject *module, PyObject *fargs, + PyObject *fkwargs) { + static char *keywords[] = {"function", "handle", "daemon", NULL}; + PyObject *func = NULL; + int daemon = 1; thread_module_state *state = get_thread_state(module); + PyObject *hobj = NULL; + if (!PyArg_ParseTupleAndKeywords(fargs, fkwargs, + "O|Op:start_joinable_thread", keywords, + &func, &hobj, &daemon)) { + return NULL; + } if (!PyCallable_Check(func)) { PyErr_SetString(PyExc_TypeError, @@ -1505,32 +1822,45 @@ thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func) return NULL; } - if (PySys_Audit("_thread.start_joinable_thread", "O", func) < 0) { + if (hobj == NULL) { + hobj = Py_None; + } + else if (hobj != Py_None && !Py_IS_TYPE(hobj, state->thread_handle_type)) { + PyErr_SetString(PyExc_TypeError, "'handle' must be a _ThreadHandle"); return NULL; } + if (PySys_Audit("_thread.start_joinable_thread", "OiO", func, daemon, + hobj) < 0) { + return NULL; + } + + if (hobj == Py_None) { + hobj = (PyObject *)PyThreadHandleObject_new(state->thread_handle_type); + if (hobj == NULL) { + return NULL; + } + } + else { + Py_INCREF(hobj); + } + PyObject* args = PyTuple_New(0); if (args == NULL) { return NULL; } - ThreadHandleObject* hobj = new_thread_handle(state); - if (hobj == NULL) { - Py_DECREF(args); - return NULL; - } - if (do_start_new_thread(state, func, args, /*kwargs=*/ NULL, /*joinable=*/ 1, - &hobj->ident, &hobj->handle, hobj->thread_is_exiting)) { - Py_DECREF(args); + int st = do_start_new_thread(state, func, args, + /*kwargs=*/ NULL, ((PyThreadHandleObject*)hobj)->handle, daemon); + Py_DECREF(args); + if (st < 0) { Py_DECREF(hobj); return NULL; } - set_thread_handle_state(hobj, THREAD_HANDLE_RUNNING); - Py_DECREF(args); - return (PyObject*) hobj; + return (PyObject *) hobj; } PyDoc_STRVAR(start_joinable_doc, -"start_joinable_thread(function)\n\ +"start_joinable_thread(function[, daemon=True[, handle=None]])\n\ \n\ *For internal use only*: start a new thread.\n\ \n\ @@ -1538,7 +1868,9 @@ Like start_new_thread(), this starts a new thread calling the given function.\n\ Unlike start_new_thread(), this returns a handle object with methods to join\n\ or detach the given thread.\n\ This function is not for third-party code, please use the\n\ -`threading` module instead.\n"); +`threading` module instead. During finalization the runtime will not wait for\n\ +the thread to exit if daemon is True. If handle is provided it must be a\n\ +newly created thread._ThreadHandle instance."); static PyObject * thread_PyThread_exit_thread(PyObject *self, PyObject *Py_UNUSED(ignored)) @@ -1650,67 +1982,6 @@ yet finished.\n\ This function is meant for internal and specialized purposes only.\n\ In most applications `threading.enumerate()` should be used instead."); -static void -release_sentinel(void *weakref_raw) -{ - PyObject *weakref = _PyObject_CAST(weakref_raw); - - /* Tricky: this function is called when the current thread state - is being deleted. Therefore, only simple C code can safely - execute here. */ - lockobject *lock = (lockobject *)_PyWeakref_GET_REF(weakref); - if (lock != NULL) { - if (lock->locked) { - lock->locked = 0; - PyThread_release_lock(lock->lock_lock); - } - Py_DECREF(lock); - } - - /* Deallocating a weakref with a NULL callback only calls - PyObject_GC_Del(), which can't call any Python code. */ - Py_DECREF(weakref); -} - -static PyObject * -thread__set_sentinel(PyObject *module, PyObject *Py_UNUSED(ignored)) -{ - PyObject *wr; - PyThreadState *tstate = _PyThreadState_GET(); - lockobject *lock; - - if (tstate->on_delete_data != NULL) { - /* We must support the re-creation of the lock from a - fork()ed child. */ - assert(tstate->on_delete == &release_sentinel); - wr = (PyObject *) tstate->on_delete_data; - tstate->on_delete = NULL; - tstate->on_delete_data = NULL; - Py_DECREF(wr); - } - lock = newlockobject(module); - if (lock == NULL) - return NULL; - /* The lock is owned by whoever called _set_sentinel(), but the weakref - hangs to the thread state. */ - wr = PyWeakref_NewRef((PyObject *) lock, NULL); - if (wr == NULL) { - Py_DECREF(lock); - return NULL; - } - tstate->on_delete_data = (void *) wr; - tstate->on_delete = &release_sentinel; - return (PyObject *) lock; -} - -PyDoc_STRVAR(_set_sentinel_doc, -"_set_sentinel() -> lock\n\ -\n\ -Set a sentinel lock that will be released when the current thread\n\ -state is finalized (after it is untied from the interpreter).\n\ -\n\ -This is a private API for the threading module."); - static PyObject * thread_stack_size(PyObject *self, PyObject *args) { @@ -1917,13 +2188,101 @@ PyDoc_STRVAR(thread__is_main_interpreter_doc, \n\ Return True if the current interpreter is the main Python interpreter."); +static PyObject * +thread_shutdown(PyObject *self, PyObject *args) +{ + PyThread_ident_t ident = PyThread_get_thread_ident_ex(); + thread_module_state *state = get_thread_state(self); + + for (;;) { + ThreadHandle *handle = NULL; + + // Find a thread that's not yet finished. + HEAD_LOCK(&_PyRuntime); + struct llist_node *node; + llist_for_each_safe(node, &state->shutdown_handles) { + ThreadHandle *cur = llist_data(node, ThreadHandle, shutdown_node); + if (cur->ident != ident) { + ThreadHandle_incref(cur); + handle = cur; + break; + } + } + HEAD_UNLOCK(&_PyRuntime); + + if (!handle) { + // No more threads to wait on! + break; + } + + // Wait for the thread to finish. If we're interrupted, such + // as by a ctrl-c we print the error and exit early. + if (ThreadHandle_join(handle, -1) < 0) { + PyErr_WriteUnraisable(NULL); + ThreadHandle_decref(handle); + Py_RETURN_NONE; + } + + ThreadHandle_decref(handle); + } + + Py_RETURN_NONE; +} + +PyDoc_STRVAR(shutdown_doc, +"_shutdown()\n\ +\n\ +Wait for all non-daemon threads (other than the calling thread) to stop."); + +static PyObject * +thread__make_thread_handle(PyObject *module, PyObject *identobj) +{ + thread_module_state *state = get_thread_state(module); + if (!PyLong_Check(identobj)) { + PyErr_SetString(PyExc_TypeError, "ident must be an integer"); + return NULL; + } + PyThread_ident_t ident = PyLong_AsUnsignedLongLong(identobj); + if (PyErr_Occurred()) { + return NULL; + } + PyThreadHandleObject *hobj = + PyThreadHandleObject_new(state->thread_handle_type); + if (hobj == NULL) { + return NULL; + } + PyMutex_Lock(&hobj->handle->mutex); + hobj->handle->ident = ident; + hobj->handle->state = THREAD_HANDLE_RUNNING; + PyMutex_Unlock(&hobj->handle->mutex); + return (PyObject*) hobj; +} + +PyDoc_STRVAR(thread__make_thread_handle_doc, +"_make_thread_handle(ident)\n\ +\n\ +Internal only. Make a thread handle for threads not spawned\n\ +by the _thread or threading module."); + +static PyObject * +thread__get_main_thread_ident(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + return PyLong_FromUnsignedLongLong(_PyRuntime.main_thread); +} + +PyDoc_STRVAR(thread__get_main_thread_ident_doc, +"_get_main_thread_ident()\n\ +\n\ +Internal only. Return a non-zero integer that uniquely identifies the main thread\n\ +of the main interpreter."); + static PyMethodDef thread_methods[] = { {"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, {"start_new", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, - {"start_joinable_thread", (PyCFunction)thread_PyThread_start_joinable_thread, - METH_O, start_joinable_doc}, + {"start_joinable_thread", _PyCFunction_CAST(thread_PyThread_start_joinable_thread), + METH_VARARGS | METH_KEYWORDS, start_joinable_doc}, {"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed, METH_NOARGS, daemon_threads_allowed_doc}, {"allocate_lock", thread_PyThread_allocate_lock, @@ -1946,12 +2305,16 @@ static PyMethodDef thread_methods[] = { METH_NOARGS, _count_doc}, {"stack_size", (PyCFunction)thread_stack_size, METH_VARARGS, stack_size_doc}, - {"_set_sentinel", thread__set_sentinel, - METH_NOARGS, _set_sentinel_doc}, {"_excepthook", thread_excepthook, METH_O, excepthook_doc}, {"_is_main_interpreter", thread__is_main_interpreter, METH_NOARGS, thread__is_main_interpreter_doc}, + {"_shutdown", thread_shutdown, + METH_NOARGS, shutdown_doc}, + {"_make_thread_handle", thread__make_thread_handle, + METH_O, thread__make_thread_handle_doc}, + {"_get_main_thread_ident", thread__get_main_thread_ident, + METH_NOARGS, thread__get_main_thread_ident_doc}, {NULL, NULL} /* sentinel */ }; @@ -2041,6 +2404,8 @@ thread_module_exec(PyObject *module) return -1; } + llist_init(&state->shutdown_handles); + return 0; } @@ -2066,6 +2431,10 @@ thread_module_clear(PyObject *module) Py_CLEAR(state->local_type); Py_CLEAR(state->local_dummy_type); Py_CLEAR(state->thread_handle_type); + // Remove any remaining handles (e.g. if shutdown exited early due to + // interrupt) so that attempts to unlink the handle after our module state + // is destroyed do not crash. + clear_shutdown_handles(state); return 0; } diff --git a/Python/lock.c b/Python/lock.c index de25adce385..7d1ead585de 100644 --- a/Python/lock.c +++ b/Python/lock.c @@ -304,30 +304,6 @@ PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns) } } -_PyEventRc * -_PyEventRc_New(void) -{ - _PyEventRc *erc = (_PyEventRc *)PyMem_RawCalloc(1, sizeof(_PyEventRc)); - if (erc != NULL) { - erc->refcount = 1; - } - return erc; -} - -void -_PyEventRc_Incref(_PyEventRc *erc) -{ - _Py_atomic_add_ssize(&erc->refcount, 1); -} - -void -_PyEventRc_Decref(_PyEventRc *erc) -{ - if (_Py_atomic_add_ssize(&erc->refcount, -1) == 1) { - PyMem_RawFree(erc); - } -} - static int unlock_once(_PyOnceFlag *o, int res) { diff --git a/Python/pystate.c b/Python/pystate.c index 635616c5648..9f142223aff 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1032,20 +1032,7 @@ _PyInterpreterState_SetRunningMain(PyInterpreterState *interp) void _PyInterpreterState_SetNotRunningMain(PyInterpreterState *interp) { - PyThreadState *tstate = interp->threads.main; - assert(tstate == current_fast_get()); - - if (tstate->on_delete != NULL) { - // The threading module was imported for the first time in this - // thread, so it was set as threading._main_thread. (See gh-75698.) - // The thread has finished running the Python program so we mark - // the thread object as finished. - assert(tstate->_whence != _PyThreadState_WHENCE_THREADING); - tstate->on_delete(tstate->on_delete_data); - tstate->on_delete = NULL; - tstate->on_delete_data = NULL; - } - + assert(interp->threads.main == current_fast_get()); interp->threads.main = NULL; } @@ -1570,16 +1557,6 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(tstate->context); - if (tstate->on_delete != NULL) { - // For the "main" thread of each interpreter, this is meant - // to be done in _PyInterpreterState_SetNotRunningMain(). - // That leaves threads created by the threading module, - // and any threads killed by forking. - // However, we also accommodate "main" threads that still - // don't call _PyInterpreterState_SetNotRunningMain() yet. - tstate->on_delete(tstate->on_delete_data); - } - #ifdef Py_GIL_DISABLED // Each thread should clear own freelists in free-threading builds. struct _Py_object_freelists *freelists = _Py_object_freelists_GET();