mirror of
https://github.com/python/cpython.git
synced 2024-11-21 21:09:37 +01:00
e38b43c213
sqlite3.iterdump() depends on the row factory returning resulting rows as tuples; it will fail with custom row factories like for example a dict factory. With this commit, we explicitly reset the row factory of the cursor used by iterdump(), so we always get predictable results. This does not affect the row factory of the parent connection. Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com> Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
114 lines
4.1 KiB
Python
114 lines
4.1 KiB
Python
# Mimic the sqlite3 console shell's .dump command
|
|
# Author: Paul Kippes <kippesp@gmail.com>
|
|
|
|
# Every identifier in sql is quoted based on a comment in sqlite
|
|
# documentation "SQLite adds new keywords from time to time when it
|
|
# takes on new features. So to prevent your code from being broken by
|
|
# future enhancements, you should normally quote any identifier that
|
|
# is an English language word, even if you do not have to."
|
|
|
|
def _quote_name(name):
|
|
return '"{0}"'.format(name.replace('"', '""'))
|
|
|
|
|
|
def _quote_value(value):
|
|
return "'{0}'".format(value.replace("'", "''"))
|
|
|
|
|
|
def _iterdump(connection, *, filter=None):
|
|
"""
|
|
Returns an iterator to the dump of the database in an SQL text format.
|
|
|
|
Used to produce an SQL dump of the database. Useful to save an in-memory
|
|
database for later restoration. This function should not be called
|
|
directly but instead called from the Connection method, iterdump().
|
|
"""
|
|
|
|
writeable_schema = False
|
|
cu = connection.cursor()
|
|
cu.row_factory = None # Make sure we get predictable results.
|
|
# Disable foreign key constraints, if there is any foreign key violation.
|
|
violations = cu.execute("PRAGMA foreign_key_check").fetchall()
|
|
if violations:
|
|
yield('PRAGMA foreign_keys=OFF;')
|
|
yield('BEGIN TRANSACTION;')
|
|
|
|
if filter:
|
|
# Return database objects which match the filter pattern.
|
|
filter_name_clause = 'AND "name" LIKE ?'
|
|
params = [filter]
|
|
else:
|
|
filter_name_clause = ""
|
|
params = []
|
|
# sqlite_master table contains the SQL CREATE statements for the database.
|
|
q = f"""
|
|
SELECT "name", "type", "sql"
|
|
FROM "sqlite_master"
|
|
WHERE "sql" NOT NULL AND
|
|
"type" == 'table'
|
|
{filter_name_clause}
|
|
ORDER BY "name"
|
|
"""
|
|
schema_res = cu.execute(q, params)
|
|
sqlite_sequence = []
|
|
for table_name, type, sql in schema_res.fetchall():
|
|
if table_name == 'sqlite_sequence':
|
|
rows = cu.execute('SELECT * FROM "sqlite_sequence";')
|
|
sqlite_sequence = ['DELETE FROM "sqlite_sequence"']
|
|
sqlite_sequence += [
|
|
f'INSERT INTO "sqlite_sequence" VALUES({_quote_value(table_name)},{seq_value})'
|
|
for table_name, seq_value in rows.fetchall()
|
|
]
|
|
continue
|
|
elif table_name == 'sqlite_stat1':
|
|
yield('ANALYZE "sqlite_master";')
|
|
elif table_name.startswith('sqlite_'):
|
|
continue
|
|
elif sql.startswith('CREATE VIRTUAL TABLE'):
|
|
if not writeable_schema:
|
|
writeable_schema = True
|
|
yield('PRAGMA writable_schema=ON;')
|
|
yield("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
|
|
"VALUES('table',{0},{0},0,{1});".format(
|
|
_quote_value(table_name),
|
|
_quote_value(sql),
|
|
))
|
|
else:
|
|
yield('{0};'.format(sql))
|
|
|
|
# Build the insert statement for each row of the current table
|
|
table_name_ident = _quote_name(table_name)
|
|
res = cu.execute(f'PRAGMA table_info({table_name_ident})')
|
|
column_names = [str(table_info[1]) for table_info in res.fetchall()]
|
|
q = "SELECT 'INSERT INTO {0} VALUES('{1}')' FROM {0};".format(
|
|
table_name_ident,
|
|
"','".join(
|
|
"||quote({0})||".format(_quote_name(col)) for col in column_names
|
|
)
|
|
)
|
|
query_res = cu.execute(q)
|
|
for row in query_res:
|
|
yield("{0};".format(row[0]))
|
|
|
|
# Now when the type is 'index', 'trigger', or 'view'
|
|
q = f"""
|
|
SELECT "name", "type", "sql"
|
|
FROM "sqlite_master"
|
|
WHERE "sql" NOT NULL AND
|
|
"type" IN ('index', 'trigger', 'view')
|
|
{filter_name_clause}
|
|
"""
|
|
schema_res = cu.execute(q, params)
|
|
for name, type, sql in schema_res.fetchall():
|
|
yield('{0};'.format(sql))
|
|
|
|
if writeable_schema:
|
|
yield('PRAGMA writable_schema=OFF;')
|
|
|
|
# gh-79009: Yield statements concerning the sqlite_sequence table at the
|
|
# end of the transaction.
|
|
for row in sqlite_sequence:
|
|
yield('{0};'.format(row))
|
|
|
|
yield('COMMIT;')
|