0
0
mirror of https://github.com/python/cpython.git synced 2024-11-28 16:45:42 +01:00
cpython/Lib/sqlite3/__main__.py
Erlend E. Aasland 0e8b3fc718
gh-108550: Speed up sqlite3 tests (#108551)
Refactor the CLI so we can easily invoke it and mock command-line
arguments. Adapt the CLI tests so we no longer have to launch a
separate process.

Disable the busy handler for all concurrency tests; we have full
control over the order of the SQLite C API calls, so we can safely
do this.

The sqlite3 test suite now completes ~8 times faster than before.

Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
2023-08-28 14:17:34 +02:00

128 lines
3.8 KiB
Python

"""A simple SQLite CLI for the sqlite3 module.
Apart from using 'argparse' for the command-line interface,
this module implements the REPL as a thin wrapper around
the InteractiveConsole class from the 'code' stdlib module.
"""
import sqlite3
import sys
from argparse import ArgumentParser
from code import InteractiveConsole
from textwrap import dedent
def execute(c, sql, suppress_errors=True):
"""Helper that wraps execution of SQL code.
This is used both by the REPL and by direct execution from the CLI.
'c' may be a cursor or a connection.
'sql' is the SQL string to execute.
"""
try:
for row in c.execute(sql):
print(row)
except sqlite3.Error as e:
tp = type(e).__name__
try:
print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
except AttributeError:
print(f"{tp}: {e}", file=sys.stderr)
if not suppress_errors:
sys.exit(1)
class SqliteInteractiveConsole(InteractiveConsole):
"""A simple SQLite REPL."""
def __init__(self, connection):
super().__init__()
self._con = connection
self._cur = connection.cursor()
def runsource(self, source, filename="<input>", symbol="single"):
"""Override runsource, the core of the InteractiveConsole REPL.
Return True if more input is needed; buffering is done automatically.
Return False is input is a complete statement ready for execution.
"""
match source:
case ".version":
print(f"{sqlite3.sqlite_version}")
case ".help":
print("Enter SQL code and press enter.")
case ".quit":
sys.exit(0)
case _:
if not sqlite3.complete_statement(source):
return True
execute(self._cur, source)
return False
def main(*args):
parser = ArgumentParser(
description="Python sqlite3 CLI",
prog="python -m sqlite3",
)
parser.add_argument(
"filename", type=str, default=":memory:", nargs="?",
help=(
"SQLite database to open (defaults to ':memory:'). "
"A new database is created if the file does not previously exist."
),
)
parser.add_argument(
"sql", type=str, nargs="?",
help=(
"An SQL query to execute. "
"Any returned rows are printed to stdout."
),
)
parser.add_argument(
"-v", "--version", action="version",
version=f"SQLite version {sqlite3.sqlite_version}",
help="Print underlying SQLite library version",
)
args = parser.parse_args(*args)
if args.filename == ":memory:":
db_name = "a transient in-memory database"
else:
db_name = repr(args.filename)
# Prepare REPL banner and prompts.
if sys.platform == "win32" and "idlelib.run" not in sys.modules:
eofkey = "CTRL-Z"
else:
eofkey = "CTRL-D"
banner = dedent(f"""
sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
Connected to {db_name}
Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or {eofkey} to quit.
""").strip()
sys.ps1 = "sqlite> "
sys.ps2 = " ... "
con = sqlite3.connect(args.filename, isolation_level=None)
try:
if args.sql:
# SQL statement provided on the command-line; execute it directly.
execute(con, args.sql, suppress_errors=False)
else:
# No SQL provided; start the REPL.
console = SqliteInteractiveConsole(con)
console.interact(banner, exitmsg="")
finally:
con.close()
sys.exit(0)
if __name__ == "__main__":
main(sys.argv)