0
0
mirror of https://github.com/python/cpython.git synced 2024-11-21 21:09:37 +01:00
cpython/Lib/sqlite3/dump.py
Erlend E. Aasland e38b43c213
gh-118221: Always use the default row factory in sqlite3.iterdump() (#118223)
sqlite3.iterdump() depends on the row factory returning resulting rows
as tuples; it will fail with custom row factories like for example a
dict factory.

With this commit, we explicitly reset the row factory of the cursor used
by iterdump(), so we always get predictable results. This does not
affect the row factory of the parent connection.

Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
2024-04-25 10:11:45 +02:00

114 lines
4.1 KiB
Python

# Mimic the sqlite3 console shell's .dump command
# Author: Paul Kippes <kippesp@gmail.com>
# Every identifier in sql is quoted based on a comment in sqlite
# documentation "SQLite adds new keywords from time to time when it
# takes on new features. So to prevent your code from being broken by
# future enhancements, you should normally quote any identifier that
# is an English language word, even if you do not have to."
def _quote_name(name):
return '"{0}"'.format(name.replace('"', '""'))
def _quote_value(value):
return "'{0}'".format(value.replace("'", "''"))
def _iterdump(connection, *, filter=None):
"""
Returns an iterator to the dump of the database in an SQL text format.
Used to produce an SQL dump of the database. Useful to save an in-memory
database for later restoration. This function should not be called
directly but instead called from the Connection method, iterdump().
"""
writeable_schema = False
cu = connection.cursor()
cu.row_factory = None # Make sure we get predictable results.
# Disable foreign key constraints, if there is any foreign key violation.
violations = cu.execute("PRAGMA foreign_key_check").fetchall()
if violations:
yield('PRAGMA foreign_keys=OFF;')
yield('BEGIN TRANSACTION;')
if filter:
# Return database objects which match the filter pattern.
filter_name_clause = 'AND "name" LIKE ?'
params = [filter]
else:
filter_name_clause = ""
params = []
# sqlite_master table contains the SQL CREATE statements for the database.
q = f"""
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" == 'table'
{filter_name_clause}
ORDER BY "name"
"""
schema_res = cu.execute(q, params)
sqlite_sequence = []
for table_name, type, sql in schema_res.fetchall():
if table_name == 'sqlite_sequence':
rows = cu.execute('SELECT * FROM "sqlite_sequence";')
sqlite_sequence = ['DELETE FROM "sqlite_sequence"']
sqlite_sequence += [
f'INSERT INTO "sqlite_sequence" VALUES({_quote_value(table_name)},{seq_value})'
for table_name, seq_value in rows.fetchall()
]
continue
elif table_name == 'sqlite_stat1':
yield('ANALYZE "sqlite_master";')
elif table_name.startswith('sqlite_'):
continue
elif sql.startswith('CREATE VIRTUAL TABLE'):
if not writeable_schema:
writeable_schema = True
yield('PRAGMA writable_schema=ON;')
yield("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
"VALUES('table',{0},{0},0,{1});".format(
_quote_value(table_name),
_quote_value(sql),
))
else:
yield('{0};'.format(sql))
# Build the insert statement for each row of the current table
table_name_ident = _quote_name(table_name)
res = cu.execute(f'PRAGMA table_info({table_name_ident})')
column_names = [str(table_info[1]) for table_info in res.fetchall()]
q = "SELECT 'INSERT INTO {0} VALUES('{1}')' FROM {0};".format(
table_name_ident,
"','".join(
"||quote({0})||".format(_quote_name(col)) for col in column_names
)
)
query_res = cu.execute(q)
for row in query_res:
yield("{0};".format(row[0]))
# Now when the type is 'index', 'trigger', or 'view'
q = f"""
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" IN ('index', 'trigger', 'view')
{filter_name_clause}
"""
schema_res = cu.execute(q, params)
for name, type, sql in schema_res.fetchall():
yield('{0};'.format(sql))
if writeable_schema:
yield('PRAGMA writable_schema=OFF;')
# gh-79009: Yield statements concerning the sqlite_sequence table at the
# end of the transaction.
for row in sqlite_sequence:
yield('{0};'.format(row))
yield('COMMIT;')