"""Tests for thread-local bytecode.""" import dis import textwrap import unittest from test import support from test.support import cpython_only, import_helper, requires_specialization_ft from test.support.script_helper import assert_python_ok from test.support.threading_helper import requires_working_threading # Skip this test if the _testinternalcapi module isn't available _testinternalcapi = import_helper.import_module("_testinternalcapi") @cpython_only @requires_working_threading() @unittest.skipUnless(support.Py_GIL_DISABLED, "only in free-threaded builds") class TLBCTests(unittest.TestCase): @requires_specialization_ft def test_new_threads_start_with_unspecialized_code(self): code = textwrap.dedent(""" import dis import queue import threading from _testinternalcapi import get_tlbc def all_opnames(bc): return {i.opname for i in dis._get_instructions_bytes(bc)} def f(a, b, q=None): if q is not None: q.put(get_tlbc(f)) return a + b for _ in range(100): # specialize f(1, 2) q = queue.Queue() t = threading.Thread(target=f, args=('a', 'b', q)) t.start() t.join() assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f)) assert "BINARY_OP_ADD_INT" not in all_opnames(q.get()) """) assert_python_ok("-X", "tlbc=1", "-c", code) @requires_specialization_ft def test_threads_specialize_independently(self): code = textwrap.dedent(""" import dis import queue import threading from _testinternalcapi import get_tlbc def all_opnames(bc): return {i.opname for i in dis._get_instructions_bytes(bc)} def f(a, b): return a + b def g(a, b, q=None): for _ in range(100): f(a, b) if q is not None: q.put(get_tlbc(f)) # specialize in main thread g(1, 2) # specialize in other thread q = queue.Queue() t = threading.Thread(target=g, args=('a', 'b', q)) t.start() t.join() assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f)) t_opnames = all_opnames(q.get()) assert "BINARY_OP_ADD_INT" not in t_opnames assert "BINARY_OP_ADD_UNICODE" in t_opnames """) assert_python_ok("-X", "tlbc=1", "-c", code) def test_reuse_tlbc_across_threads_different_lifetimes(self): code = textwrap.dedent(""" import queue import threading from _testinternalcapi import get_tlbc_id def f(a, b, q=None): if q is not None: q.put(get_tlbc_id(f)) return a + b q = queue.Queue() tlbc_ids = [] for _ in range(3): t = threading.Thread(target=f, args=('a', 'b', q)) t.start() t.join() tlbc_ids.append(q.get()) assert tlbc_ids[0] == tlbc_ids[1] assert tlbc_ids[1] == tlbc_ids[2] """) assert_python_ok("-X", "tlbc=1", "-c", code) def test_no_copies_if_tlbc_disabled(self): code = textwrap.dedent(""" import queue import threading from _testinternalcapi import get_tlbc_id def f(a, b, q=None): if q is not None: q.put(get_tlbc_id(f)) return a + b q = queue.Queue() threads = [] for _ in range(3): t = threading.Thread(target=f, args=('a', 'b', q)) t.start() threads.append(t) tlbc_ids = [] for t in threads: t.join() tlbc_ids.append(q.get()) main_tlbc_id = get_tlbc_id(f) assert main_tlbc_id is not None assert tlbc_ids[0] == main_tlbc_id assert tlbc_ids[1] == main_tlbc_id assert tlbc_ids[2] == main_tlbc_id """) assert_python_ok("-X", "tlbc=0", "-c", code) def test_no_specialization_if_tlbc_disabled(self): code = textwrap.dedent(""" import dis import queue import threading from _testinternalcapi import get_tlbc def all_opnames(f): bc = get_tlbc(f) return {i.opname for i in dis._get_instructions_bytes(bc)} def f(a, b): return a + b for _ in range(100): f(1, 2) assert "BINARY_OP_ADD_INT" not in all_opnames(f) """) assert_python_ok("-X", "tlbc=0", "-c", code) def test_generator_throw(self): code = textwrap.dedent(""" import queue import threading from _testinternalcapi import get_tlbc_id def g(): try: yield except: yield get_tlbc_id(g) def f(q): gen = g() next(gen) q.put(gen.throw(ValueError)) q = queue.Queue() t = threading.Thread(target=f, args=(q,)) t.start() t.join() gen = g() next(gen) main_id = gen.throw(ValueError) assert main_id != q.get() """) assert_python_ok("-X", "tlbc=1", "-c", code) if __name__ == "__main__": unittest.main()