"""Test the interactive interpreter.""" import os import select import subprocess import sys import unittest from textwrap import dedent from test import support from test.support import ( cpython_only, has_subprocess_support, os_helper, SuppressCrashReport, SHORT_TIMEOUT, ) from test.support.script_helper import kill_python from test.support.import_helper import import_module try: import pty except ImportError: pty = None if not has_subprocess_support: raise unittest.SkipTest("test module requires subprocess") def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): """Run the Python REPL with the given arguments. kw is extra keyword args to pass to subprocess.Popen. Returns a Popen object. """ # To run the REPL without using a terminal, spawn python with the command # line option '-i' and the process name set to ''. # The directory of argv[0] must match the directory of the Python # executable for the Popen() call to python to succeed as the directory # path may be used by Py_GetPath() to build the default module search # path. stdin_fname = os.path.join(os.path.dirname(sys.executable), "") cmd_line = [stdin_fname, '-I', '-i'] cmd_line.extend(args) # Set TERM=vt100, for the rationale see the comments in spawn_python() of # test.support.script_helper. env = kw.setdefault('env', dict(os.environ)) env['TERM'] = 'vt100' return subprocess.Popen(cmd_line, executable=sys.executable, text=True, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr, **kw) def run_on_interactive_mode(source): """Spawn a new Python interpreter, pass the given input source code from the stdin and return the result back. If the interpreter exits non-zero, it raises a ValueError.""" process = spawn_repl() process.stdin.write(source) output = kill_python(process) if process.returncode != 0: raise ValueError("Process didn't exit properly.") return output class TestInteractiveInterpreter(unittest.TestCase): @cpython_only # Python built with Py_TRACE_REFS fail with a fatal error in # _PyRefchain_Trace() on memory allocation error. @unittest.skipIf(support.Py_TRACE_REFS, 'cannot test Py_TRACE_REFS build') def test_no_memory(self): import_module("_testcapi") # Issue #30696: Fix the interactive interpreter looping endlessly when # no memory. Check also that the fix does not break the interactive # loop when an exception is raised. user_input = """ import sys, _testcapi 1/0 print('After the exception.') _testcapi.set_nomemory(0) sys.exit(0) """ user_input = dedent(user_input) p = spawn_repl() with SuppressCrashReport(): p.stdin.write(user_input) output = kill_python(p) self.assertIn('After the exception.', output) # Exit code 120: Py_FinalizeEx() failed to flush stdout and stderr. self.assertIn(p.returncode, (1, 120)) @cpython_only def test_multiline_string_parsing(self): # bpo-39209: Multiline string tokens need to be handled in the tokenizer # in two places: the interactive path and the non-interactive path. user_input = '''\ x = """ 0KiB 0 1.3 0 16738211KiB 237.15 1.3 0 never none """ ''' user_input = dedent(user_input) p = spawn_repl() p.stdin.write(user_input) output = kill_python(p) self.assertEqual(p.returncode, 0) def test_close_stdin(self): user_input = dedent(''' import os print("before close") os.close(0) ''') prepare_repl = dedent(''' from test.support import suppress_msvcrt_asserts suppress_msvcrt_asserts() ''') process = spawn_repl('-c', prepare_repl) output = process.communicate(user_input)[0] self.assertEqual(process.returncode, 0) self.assertIn('before close', output) def test_interactive_traceback_reporting(self): user_input = "1 / 0 / 3 / 4" p = spawn_repl() p.stdin.write(user_input) output = kill_python(p) self.assertEqual(p.returncode, 0) traceback_lines = output.splitlines()[-6:-1] expected_lines = [ "Traceback (most recent call last):", " File \"\", line 1, in ", " 1 / 0 / 3 / 4", " ~~^~~", "ZeroDivisionError: division by zero", ] self.assertEqual(traceback_lines, expected_lines) def test_interactive_traceback_reporting_multiple_input(self): user_input1 = dedent(""" def foo(x): 1 / x """) p = spawn_repl() p.stdin.write(user_input1) user_input2 = "foo(0)" p.stdin.write(user_input2) output = kill_python(p) self.assertEqual(p.returncode, 0) traceback_lines = output.splitlines()[-8:-1] expected_lines = [ ' File "", line 1, in ', ' foo(0)', ' ~~~^^^', ' File "", line 2, in foo', ' 1 / x', ' ~~^~~', 'ZeroDivisionError: division by zero' ] self.assertEqual(traceback_lines, expected_lines) def test_runsource_show_syntax_error_location(self): user_input = dedent("""def f(x, x): ... """) p = spawn_repl() p.stdin.write(user_input) output = kill_python(p) expected_lines = [ ' def f(x, x): ...', ' ^', "SyntaxError: duplicate argument 'x' in function definition" ] self.assertEqual(output.splitlines()[4:-1], expected_lines) def test_interactive_source_is_in_linecache(self): user_input = dedent(""" def foo(x): return x + 1 def bar(x): return foo(x) + 2 """) p = spawn_repl() p.stdin.write(user_input) user_input2 = dedent(""" import linecache print(linecache.cache['-1']) """) p.stdin.write(user_input2) output = kill_python(p) self.assertEqual(p.returncode, 0) expected = "(30, None, [\'def foo(x):\\n\', \' return x + 1\\n\', \'\\n\'], \'\')" self.assertIn(expected, output, expected) def test_asyncio_repl_reaches_python_startup_script(self): with os_helper.temp_dir() as tmpdir: script = os.path.join(tmpdir, "pythonstartup.py") with open(script, "w") as f: f.write("print('pythonstartup done!')" + os.linesep) f.write("exit(0)" + os.linesep) env = os.environ.copy() env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".asyncio_history") env["PYTHONSTARTUP"] = script subprocess.check_call( [sys.executable, "-m", "asyncio"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, timeout=SHORT_TIMEOUT, ) @unittest.skipUnless(pty, "requires pty") def test_asyncio_repl_is_ok(self): m, s = pty.openpty() cmd = [sys.executable, "-I", "-m", "asyncio"] env = os.environ.copy() proc = subprocess.Popen( cmd, stdin=s, stdout=s, stderr=s, text=True, close_fds=True, env=env, ) os.close(s) os.write(m, b"await asyncio.sleep(0)\n") os.write(m, b"exit()\n") output = [] while select.select([m], [], [], SHORT_TIMEOUT)[0]: try: data = os.read(m, 1024).decode("utf-8") if not data: break except OSError: break output.append(data) os.close(m) try: exit_code = proc.wait(timeout=SHORT_TIMEOUT) except subprocess.TimeoutExpired: proc.kill() exit_code = proc.wait() self.assertEqual(exit_code, 0, "".join(output)) class TestInteractiveModeSyntaxErrors(unittest.TestCase): def test_interactive_syntax_error_correct_line(self): output = run_on_interactive_mode(dedent("""\ def f(): print(0) return yield 42 """)) traceback_lines = output.splitlines()[-4:-1] expected_lines = [ ' return yield 42', ' ^^^^^', 'SyntaxError: invalid syntax' ] self.assertEqual(traceback_lines, expected_lines) class TestAsyncioREPLContextVars(unittest.TestCase): def test_toplevel_contextvars_sync(self): user_input = dedent("""\ from contextvars import ContextVar var = ContextVar("var", default="failed") var.set("ok") """) p = spawn_repl("-m", "asyncio") p.stdin.write(user_input) user_input2 = dedent(""" print(f"toplevel contextvar test: {var.get()}") """) p.stdin.write(user_input2) output = kill_python(p) self.assertEqual(p.returncode, 0) expected = "toplevel contextvar test: ok" self.assertIn(expected, output, expected) def test_toplevel_contextvars_async(self): user_input = dedent("""\ from contextvars import ContextVar var = ContextVar('var', default='failed') """) p = spawn_repl("-m", "asyncio") p.stdin.write(user_input) user_input2 = "async def set_var(): var.set('ok')\n" p.stdin.write(user_input2) user_input3 = "await set_var()\n" p.stdin.write(user_input3) user_input4 = "print(f'toplevel contextvar test: {var.get()}')\n" p.stdin.write(user_input4) output = kill_python(p) self.assertEqual(p.returncode, 0) expected = "toplevel contextvar test: ok" self.assertIn(expected, output, expected) if __name__ == "__main__": unittest.main()