mirror of
https://github.com/python/cpython.git
synced 2024-11-28 16:45:42 +01:00
7e38e6745d
The `zip_next` function uses a common optimization technique for methods that generate tuples. The iterator maintains an internal reference to the returned tuple. When the method is called again, it checks if the internal tuple's reference count is 1. If so, the tuple can be reused. However, this approach is not safe under the free-threading build: after checking the reference count, another thread may perform the same check and also reuse the tuple. This can result in a double decref on the items of the replaced tuple and a double incref (memory leak) on the items of the tuple being set. This adds a function, `_PyObject_IsUniquelyReferenced` that encapsulates the stricter logic necessary for the free-threaded build: the internal tuple must be owned by the current thread, have a local refcount of one, and a shared refcount of zero.
42 lines
1.0 KiB
Python
42 lines
1.0 KiB
Python
import unittest
|
|
from threading import Thread
|
|
|
|
from test.support import threading_helper
|
|
|
|
|
|
class ZipThreading(unittest.TestCase):
|
|
@staticmethod
|
|
def work(enum):
|
|
while True:
|
|
try:
|
|
next(enum)
|
|
except StopIteration:
|
|
break
|
|
|
|
@threading_helper.reap_threads
|
|
@threading_helper.requires_working_threading()
|
|
def test_threading(self):
|
|
number_of_threads = 8
|
|
number_of_iterations = 8
|
|
n = 40_000
|
|
enum = zip(range(n), range(n))
|
|
for _ in range(number_of_iterations):
|
|
worker_threads = []
|
|
for ii in range(number_of_threads):
|
|
worker_threads.append(
|
|
Thread(
|
|
target=self.work,
|
|
args=[
|
|
enum,
|
|
],
|
|
)
|
|
)
|
|
for t in worker_threads:
|
|
t.start()
|
|
for t in worker_threads:
|
|
t.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|