# This script runs a set of small benchmarks to help identify scaling # bottlenecks in the free-threaded interpreter. The benchmarks consist # of patterns that ought to scale well, but haven't in the past. This is # typically due to reference count contention or lock contention. # # This is not intended to be a general multithreading benchmark suite, nor # are the benchmarks intended to be representative of real-world workloads. # # On Linux, to avoid confounding hardware effects, the script attempts to: # * Use a single CPU socket (to avoid NUMA effects) # * Use distinct physical cores (to avoid hyperthreading/SMT effects) # * Use "performance" cores (Intel, ARM) on CPUs that have performance and # efficiency cores # # It also helps to disable dynamic frequency scaling (i.e., "Turbo Boost") # # Intel: # > echo "1" | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo # # AMD: # > echo "0" | sudo tee /sys/devices/system/cpu/cpufreq/boost # import math import os import queue import sys import threading import time # The iterations in individual benchmarks are scaled by this factor. WORK_SCALE = 100 ALL_BENCHMARKS = {} threads = [] in_queues = [] out_queues = [] def register_benchmark(func): ALL_BENCHMARKS[func.__name__] = func return func @register_benchmark def object_cfunction(): accu = 0 tab = [1] * 100 for i in range(1000 * WORK_SCALE): tab.pop(0) tab.append(i) accu += tab[50] return accu @register_benchmark def cmodule_function(): for i in range(1000 * WORK_SCALE): math.floor(i * i) @register_benchmark def mult_constant(): x = 1.0 for i in range(3000 * WORK_SCALE): x *= 1.01 def simple_gen(): for i in range(10): yield i @register_benchmark def generator(): accu = 0 for i in range(100 * WORK_SCALE): for v in simple_gen(): accu += v return accu class Counter: def __init__(self): self.i = 0 def next_number(self): self.i += 1 return self.i @register_benchmark def pymethod(): c = Counter() for i in range(1000 * WORK_SCALE): c.next_number() return c.i def next_number(i): return i + 1 @register_benchmark def pyfunction(): accu = 0 for i in range(1000 * WORK_SCALE): accu = next_number(i) return accu def double(x): return x + x module = sys.modules[__name__] @register_benchmark def module_function(): total = 0 for i in range(1000 * WORK_SCALE): total += module.double(i) return total class MyObject: pass @register_benchmark def load_string_const(): accu = 0 for i in range(1000 * WORK_SCALE): if i == 'a string': accu += 7 else: accu += 1 return accu @register_benchmark def load_tuple_const(): accu = 0 for i in range(1000 * WORK_SCALE): if i == (1, 2): accu += 7 else: accu += 1 return accu @register_benchmark def create_pyobject(): for i in range(1000 * WORK_SCALE): o = MyObject() @register_benchmark def create_closure(): for i in range(1000 * WORK_SCALE): def foo(x): return x foo(i) @register_benchmark def create_dict(): for i in range(1000 * WORK_SCALE): d = { "key": "value", } thread_local = threading.local() @register_benchmark def thread_local_read(): tmp = thread_local tmp.x = 10 for i in range(500 * WORK_SCALE): _ = tmp.x _ = tmp.x _ = tmp.x _ = tmp.x _ = tmp.x def bench_one_thread(func): t0 = time.perf_counter_ns() func() t1 = time.perf_counter_ns() return t1 - t0 def bench_parallel(func): t0 = time.perf_counter_ns() for inq in in_queues: inq.put(func) for outq in out_queues: outq.get() t1 = time.perf_counter_ns() return t1 - t0 def benchmark(func): delta_one_thread = bench_one_thread(func) delta_many_threads = bench_parallel(func) speedup = delta_one_thread * len(threads) / delta_many_threads if speedup >= 1: factor = speedup direction = "faster" else: factor = 1 / speedup direction = "slower" use_color = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() color = reset_color = "" if use_color: if speedup <= 1.1: color = "\x1b[31m" # red elif speedup < len(threads)/2: color = "\x1b[33m" # yellow reset_color = "\x1b[0m" print(f"{color}{func.__name__:<18} {round(factor, 1):>4}x {direction}{reset_color}") def determine_num_threads_and_affinity(): if sys.platform != "linux": return [None] * os.cpu_count() # Try to use `lscpu -p` on Linux import subprocess try: output = subprocess.check_output(["lscpu", "-p=cpu,node,core,MAXMHZ"], text=True, env={"LC_NUMERIC": "C"}) except (FileNotFoundError, subprocess.CalledProcessError): return [None] * os.cpu_count() table = [] for line in output.splitlines(): if line.startswith("#"): continue cpu, node, core, maxhz = line.split(",") if maxhz == "": maxhz = "0" table.append((int(cpu), int(node), int(core), float(maxhz))) cpus = [] cores = set() max_mhz_all = max(row[3] for row in table) for cpu, node, core, maxmhz in table: # Choose only CPUs on the same node, unique cores, and try to avoid # "efficiency" cores. if node == 0 and core not in cores and maxmhz == max_mhz_all: cpus.append(cpu) cores.add(core) return cpus def thread_run(cpu, in_queue, out_queue): if cpu is not None and hasattr(os, "sched_setaffinity"): # Set the affinity for the current thread os.sched_setaffinity(0, (cpu,)) while True: func = in_queue.get() if func is None: break func() out_queue.put(None) def initialize_threads(opts): if opts.threads == -1: cpus = determine_num_threads_and_affinity() else: cpus = [None] * opts.threads # don't set affinity print(f"Running benchmarks with {len(cpus)} threads") for cpu in cpus: inq = queue.Queue() outq = queue.Queue() in_queues.append(inq) out_queues.append(outq) t = threading.Thread(target=thread_run, args=(cpu, inq, outq), daemon=True) threads.append(t) t.start() def main(opts): global WORK_SCALE if not hasattr(sys, "_is_gil_enabled") or sys._is_gil_enabled(): sys.stderr.write("expected to be run with the GIL disabled\n") benchmark_names = opts.benchmarks if benchmark_names: for name in benchmark_names: if name not in ALL_BENCHMARKS: sys.stderr.write(f"Unknown benchmark: {name}\n") sys.exit(1) else: benchmark_names = ALL_BENCHMARKS.keys() WORK_SCALE = opts.scale if not opts.baseline_only: initialize_threads(opts) do_bench = not opts.baseline_only and not opts.parallel_only for name in benchmark_names: func = ALL_BENCHMARKS[name] if do_bench: benchmark(func) continue if opts.parallel_only: delta_ns = bench_parallel(func) else: delta_ns = bench_one_thread(func) time_ms = delta_ns / 1_000_000 print(f"{func.__name__:<18} {time_ms:.1f} ms") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("-t", "--threads", type=int, default=-1, help="number of threads to use") parser.add_argument("--scale", type=int, default=100, help="work scale factor for the benchmark (default=100)") parser.add_argument("--baseline-only", default=False, action="store_true", help="only run the baseline benchmarks (single thread)") parser.add_argument("--parallel-only", default=False, action="store_true", help="only run the parallel benchmark (many threads)") parser.add_argument("benchmarks", nargs="*", help="benchmarks to run") options = parser.parse_args() main(options)