mirror of
https://github.com/python/cpython.git
synced 2024-11-24 17:47:13 +01:00
gh-100414: Add SQLite backend to dbm (#114481)
Co-authored-by: Raymond Hettinger <rhettinger@users.noreply.github.com> Co-authored-by: Serhiy Storchaka <storchaka@gmail.com> Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
This commit is contained in:
parent
57e4c81ae1
commit
dd5e4d9078
@ -8,8 +8,13 @@
|
||||
|
||||
--------------
|
||||
|
||||
:mod:`dbm` is a generic interface to variants of the DBM database ---
|
||||
:mod:`dbm.gnu` or :mod:`dbm.ndbm`. If none of these modules is installed, the
|
||||
:mod:`dbm` is a generic interface to variants of the DBM database:
|
||||
|
||||
* :mod:`dbm.sqlite3`
|
||||
* :mod:`dbm.gnu`
|
||||
* :mod:`dbm.ndbm`
|
||||
|
||||
If none of these modules are installed, the
|
||||
slow-but-simple implementation in module :mod:`dbm.dumb` will be used. There
|
||||
is a `third party interface <https://www.jcea.es/programacion/pybsddb.htm>`_ to
|
||||
the Oracle Berkeley DB.
|
||||
@ -25,8 +30,8 @@ the Oracle Berkeley DB.
|
||||
.. function:: whichdb(filename)
|
||||
|
||||
This function attempts to guess which of the several simple database modules
|
||||
available --- :mod:`dbm.gnu`, :mod:`dbm.ndbm` or :mod:`dbm.dumb` --- should
|
||||
be used to open a given file.
|
||||
available --- :mod:`dbm.sqlite3`, :mod:`dbm.gnu`, :mod:`dbm.ndbm`,
|
||||
or :mod:`dbm.dumb` --- should be used to open a given file.
|
||||
|
||||
Return one of the following values:
|
||||
|
||||
@ -144,6 +149,46 @@ then prints out the contents of the database::
|
||||
|
||||
The individual submodules are described in the following sections.
|
||||
|
||||
:mod:`dbm.sqlite3` --- SQLite backend for dbm
|
||||
---------------------------------------------
|
||||
|
||||
.. module:: dbm.sqlite3
|
||||
:platform: All
|
||||
:synopsis: SQLite backend for dbm
|
||||
|
||||
.. versionadded:: 3.13
|
||||
|
||||
**Source code:** :source:`Lib/dbm/sqlite3.py`
|
||||
|
||||
--------------
|
||||
|
||||
This module uses the standard library :mod:`sqlite3` module to provide an
|
||||
SQLite backend for the :mod:`dbm` module.
|
||||
The files created by :mod:`dbm.sqlite3` can thus be opened by :mod:`sqlite3`,
|
||||
or any other SQLite browser, including the SQLite CLI.
|
||||
|
||||
.. function:: open(filename, /, flag="r", mode=0o666)
|
||||
|
||||
Open an SQLite database.
|
||||
The returned object behaves like a :term:`mapping`,
|
||||
implements a :meth:`!close` method,
|
||||
and supports a "closing" context manager via the :keyword:`with` keyword.
|
||||
|
||||
:param filename:
|
||||
The path to the database to be opened.
|
||||
:type filename: :term:`path-like object`
|
||||
|
||||
:param str flag:
|
||||
|
||||
* ``'r'`` (default): |flag_r|
|
||||
* ``'w'``: |flag_w|
|
||||
* ``'c'``: |flag_c|
|
||||
* ``'n'``: |flag_n|
|
||||
|
||||
:param mode:
|
||||
The Unix file access mode of the file (default: octal ``0o666``),
|
||||
used only when the database has to be created.
|
||||
|
||||
|
||||
:mod:`dbm.gnu` --- GNU database manager
|
||||
---------------------------------------
|
||||
|
@ -231,6 +231,16 @@ dis
|
||||
the ``show_offsets`` parameter.
|
||||
(Contributed by Irit Katriel in :gh:`112137`.)
|
||||
|
||||
dbm
|
||||
---
|
||||
|
||||
* Add :meth:`dbm.gnu.gdbm.clear` and :meth:`dbm.ndbm.ndbm.clear` methods that remove all items
|
||||
from the database.
|
||||
(Contributed by Donghee Na in :gh:`107122`.)
|
||||
|
||||
* Add new :mod:`dbm.sqlite3` backend.
|
||||
(Contributed by Raymond Hettinger and Erlend E. Aasland in :gh:`100414`.)
|
||||
|
||||
doctest
|
||||
-------
|
||||
|
||||
|
@ -5,7 +5,7 @@ Use
|
||||
import dbm
|
||||
d = dbm.open(file, 'w', 0o666)
|
||||
|
||||
The returned object is a dbm.gnu, dbm.ndbm or dbm.dumb object, dependent on the
|
||||
The returned object is a dbm.sqlite3, dbm.gnu, dbm.ndbm or dbm.dumb database object, dependent on the
|
||||
type of database being opened (determined by the whichdb function) in the case
|
||||
of an existing dbm. If the dbm does not exist and the create or new flag ('c'
|
||||
or 'n') was specified, the dbm type will be determined by the availability of
|
||||
@ -38,7 +38,7 @@ import sys
|
||||
class error(Exception):
|
||||
pass
|
||||
|
||||
_names = ['dbm.gnu', 'dbm.ndbm', 'dbm.dumb']
|
||||
_names = ['dbm.gnu', 'dbm.ndbm', 'dbm.sqlite3', 'dbm.dumb']
|
||||
_defaultmod = None
|
||||
_modules = {}
|
||||
|
||||
@ -164,6 +164,10 @@ def whichdb(filename):
|
||||
if len(s) != 4:
|
||||
return ""
|
||||
|
||||
# Check for SQLite3 header string.
|
||||
if s16 == b"SQLite format 3\0":
|
||||
return "dbm.sqlite3"
|
||||
|
||||
# Convert to 4-byte int in native byte order -- return "" if impossible
|
||||
try:
|
||||
(magic,) = struct.unpack("=l", s)
|
||||
|
141
Lib/dbm/sqlite3.py
Normal file
141
Lib/dbm/sqlite3.py
Normal file
@ -0,0 +1,141 @@
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from contextlib import suppress, closing
|
||||
from collections.abc import MutableMapping
|
||||
|
||||
BUILD_TABLE = """
|
||||
CREATE TABLE IF NOT EXISTS Dict (
|
||||
key BLOB UNIQUE NOT NULL,
|
||||
value BLOB NOT NULL
|
||||
)
|
||||
"""
|
||||
GET_SIZE = "SELECT COUNT (key) FROM Dict"
|
||||
LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)"
|
||||
STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))"
|
||||
DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)"
|
||||
ITER_KEYS = "SELECT key FROM Dict"
|
||||
|
||||
|
||||
class error(OSError):
|
||||
pass
|
||||
|
||||
|
||||
_ERR_CLOSED = "DBM object has already been closed"
|
||||
_ERR_REINIT = "DBM object does not support reinitialization"
|
||||
|
||||
|
||||
def _normalize_uri(path):
|
||||
path = Path(path)
|
||||
uri = path.absolute().as_uri()
|
||||
while "//" in uri:
|
||||
uri = uri.replace("//", "/")
|
||||
return uri
|
||||
|
||||
|
||||
class _Database(MutableMapping):
|
||||
|
||||
def __init__(self, path, /, *, flag, mode):
|
||||
if hasattr(self, "_cx"):
|
||||
raise error(_ERR_REINIT)
|
||||
|
||||
path = os.fsdecode(path)
|
||||
match flag:
|
||||
case "r":
|
||||
flag = "ro"
|
||||
case "w":
|
||||
flag = "rw"
|
||||
case "c":
|
||||
flag = "rwc"
|
||||
Path(path).touch(mode=mode, exist_ok=True)
|
||||
case "n":
|
||||
flag = "rwc"
|
||||
Path(path).unlink(missing_ok=True)
|
||||
Path(path).touch(mode=mode)
|
||||
case _:
|
||||
raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', "
|
||||
f"not {flag!r}")
|
||||
|
||||
# We use the URI format when opening the database.
|
||||
uri = _normalize_uri(path)
|
||||
uri = f"{uri}?mode={flag}"
|
||||
|
||||
try:
|
||||
self._cx = sqlite3.connect(uri, autocommit=True, uri=True)
|
||||
except sqlite3.Error as exc:
|
||||
raise error(str(exc))
|
||||
|
||||
# This is an optimization only; it's ok if it fails.
|
||||
with suppress(sqlite3.OperationalError):
|
||||
self._cx.execute("PRAGMA journal_mode = wal")
|
||||
|
||||
if flag == "rwc":
|
||||
self._execute(BUILD_TABLE)
|
||||
|
||||
def _execute(self, *args, **kwargs):
|
||||
if not self._cx:
|
||||
raise error(_ERR_CLOSED)
|
||||
try:
|
||||
return closing(self._cx.execute(*args, **kwargs))
|
||||
except sqlite3.Error as exc:
|
||||
raise error(str(exc))
|
||||
|
||||
def __len__(self):
|
||||
with self._execute(GET_SIZE) as cu:
|
||||
row = cu.fetchone()
|
||||
return row[0]
|
||||
|
||||
def __getitem__(self, key):
|
||||
with self._execute(LOOKUP_KEY, (key,)) as cu:
|
||||
row = cu.fetchone()
|
||||
if not row:
|
||||
raise KeyError(key)
|
||||
return row[0]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._execute(STORE_KV, (key, value))
|
||||
|
||||
def __delitem__(self, key):
|
||||
with self._execute(DELETE_KEY, (key,)) as cu:
|
||||
if not cu.rowcount:
|
||||
raise KeyError(key)
|
||||
|
||||
def __iter__(self):
|
||||
try:
|
||||
with self._execute(ITER_KEYS) as cu:
|
||||
for row in cu:
|
||||
yield row[0]
|
||||
except sqlite3.Error as exc:
|
||||
raise error(str(exc))
|
||||
|
||||
def close(self):
|
||||
if self._cx:
|
||||
self._cx.close()
|
||||
self._cx = None
|
||||
|
||||
def keys(self):
|
||||
return list(super().keys())
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.close()
|
||||
|
||||
|
||||
def open(filename, /, flag="r", mode=0o666):
|
||||
"""Open a dbm.sqlite3 database and return the dbm object.
|
||||
|
||||
The 'filename' parameter is the name of the database file.
|
||||
|
||||
The optional 'flag' parameter can be one of ...:
|
||||
'r' (default): open an existing database for read only access
|
||||
'w': open an existing database for read/write access
|
||||
'c': create a database if it does not exist; open for read/write access
|
||||
'n': always create a new, empty database; open for read/write access
|
||||
|
||||
The optional 'mode' parameter is the Unix file access mode of the database;
|
||||
only used when creating a new database. Default: 0o666.
|
||||
"""
|
||||
return _Database(filename, flag=flag, mode=mode)
|
@ -6,6 +6,13 @@ import os
|
||||
from test.support import import_helper
|
||||
from test.support import os_helper
|
||||
|
||||
|
||||
try:
|
||||
from dbm import sqlite3 as dbm_sqlite3
|
||||
except ImportError:
|
||||
dbm_sqlite3 = None
|
||||
|
||||
|
||||
try:
|
||||
from dbm import ndbm
|
||||
except ImportError:
|
||||
@ -213,6 +220,27 @@ class WhichDBTestCase(unittest.TestCase):
|
||||
for path in fnames:
|
||||
self.assertIsNone(self.dbm.whichdb(path))
|
||||
|
||||
@unittest.skipUnless(dbm_sqlite3, reason='Test requires dbm.sqlite3')
|
||||
def test_whichdb_sqlite3(self):
|
||||
# Databases created by dbm.sqlite3 are detected correctly.
|
||||
with dbm_sqlite3.open(_fname, "c") as db:
|
||||
db["key"] = "value"
|
||||
self.assertEqual(self.dbm.whichdb(_fname), "dbm.sqlite3")
|
||||
|
||||
@unittest.skipUnless(dbm_sqlite3, reason='Test requires dbm.sqlite3')
|
||||
def test_whichdb_sqlite3_existing_db(self):
|
||||
# Existing sqlite3 databases are detected correctly.
|
||||
sqlite3 = import_helper.import_module("sqlite3")
|
||||
try:
|
||||
# Create an empty database.
|
||||
with sqlite3.connect(_fname) as cx:
|
||||
cx.execute("CREATE TABLE dummy(database)")
|
||||
cx.commit()
|
||||
finally:
|
||||
cx.close()
|
||||
self.assertEqual(self.dbm.whichdb(_fname), "dbm.sqlite3")
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.addCleanup(cleaunup_test_dir)
|
||||
setup_test_dir()
|
||||
|
308
Lib/test/test_dbm_sqlite3.py
Normal file
308
Lib/test/test_dbm_sqlite3.py
Normal file
@ -0,0 +1,308 @@
|
||||
import sqlite3
|
||||
import sys
|
||||
import test.support
|
||||
import unittest
|
||||
from contextlib import closing
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from test.support import cpython_only, import_helper, os_helper
|
||||
|
||||
|
||||
dbm_sqlite3 = import_helper.import_module("dbm.sqlite3")
|
||||
from dbm.sqlite3 import _normalize_uri
|
||||
|
||||
|
||||
class _SQLiteDbmTests(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.filename = os_helper.TESTFN
|
||||
db = dbm_sqlite3.open(self.filename, "c")
|
||||
db.close()
|
||||
|
||||
def tearDown(self):
|
||||
for suffix in "", "-wal", "-shm":
|
||||
os_helper.unlink(self.filename + suffix)
|
||||
|
||||
|
||||
class URI(unittest.TestCase):
|
||||
|
||||
def test_uri_substitutions(self):
|
||||
dataset = (
|
||||
("/absolute/////b/c", "/absolute/b/c"),
|
||||
("PRE#MID##END", "PRE%23MID%23%23END"),
|
||||
("%#?%%#", "%25%23%3F%25%25%23"),
|
||||
)
|
||||
for path, normalized in dataset:
|
||||
with self.subTest(path=path, normalized=normalized):
|
||||
self.assertTrue(_normalize_uri(path).endswith(normalized))
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "requires Windows")
|
||||
def test_uri_windows(self):
|
||||
dataset = (
|
||||
# Relative subdir.
|
||||
(r"2018\January.xlsx",
|
||||
"2018/January.xlsx"),
|
||||
# Absolute with drive letter.
|
||||
(r"C:\Projects\apilibrary\apilibrary.sln",
|
||||
"/C:/Projects/apilibrary/apilibrary.sln"),
|
||||
# Relative with drive letter.
|
||||
(r"C:Projects\apilibrary\apilibrary.sln",
|
||||
"/C:Projects/apilibrary/apilibrary.sln"),
|
||||
)
|
||||
for path, normalized in dataset:
|
||||
with self.subTest(path=path, normalized=normalized):
|
||||
if not Path(path).is_absolute():
|
||||
self.skipTest(f"skipping relative path: {path!r}")
|
||||
self.assertTrue(_normalize_uri(path).endswith(normalized))
|
||||
|
||||
|
||||
class ReadOnly(_SQLiteDbmTests):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
with dbm_sqlite3.open(self.filename, "w") as db:
|
||||
db[b"key1"] = "value1"
|
||||
db[b"key2"] = "value2"
|
||||
self.db = dbm_sqlite3.open(self.filename, "r")
|
||||
|
||||
def tearDown(self):
|
||||
self.db.close()
|
||||
super().tearDown()
|
||||
|
||||
def test_readonly_read(self):
|
||||
self.assertEqual(self.db[b"key1"], b"value1")
|
||||
self.assertEqual(self.db[b"key2"], b"value2")
|
||||
|
||||
def test_readonly_write(self):
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
self.db[b"new"] = "value"
|
||||
|
||||
def test_readonly_delete(self):
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
del self.db[b"key1"]
|
||||
|
||||
def test_readonly_keys(self):
|
||||
self.assertEqual(self.db.keys(), [b"key1", b"key2"])
|
||||
|
||||
def test_readonly_iter(self):
|
||||
self.assertEqual([k for k in self.db], [b"key1", b"key2"])
|
||||
|
||||
|
||||
class ReadWrite(_SQLiteDbmTests):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.db = dbm_sqlite3.open(self.filename, "w")
|
||||
|
||||
def tearDown(self):
|
||||
self.db.close()
|
||||
super().tearDown()
|
||||
|
||||
def db_content(self):
|
||||
with closing(sqlite3.connect(self.filename)) as cx:
|
||||
keys = [r[0] for r in cx.execute("SELECT key FROM Dict")]
|
||||
vals = [r[0] for r in cx.execute("SELECT value FROM Dict")]
|
||||
return keys, vals
|
||||
|
||||
def test_readwrite_unique_key(self):
|
||||
self.db["key"] = "value"
|
||||
self.db["key"] = "other"
|
||||
keys, vals = self.db_content()
|
||||
self.assertEqual(keys, [b"key"])
|
||||
self.assertEqual(vals, [b"other"])
|
||||
|
||||
def test_readwrite_delete(self):
|
||||
self.db["key"] = "value"
|
||||
self.db["new"] = "other"
|
||||
|
||||
del self.db[b"new"]
|
||||
keys, vals = self.db_content()
|
||||
self.assertEqual(keys, [b"key"])
|
||||
self.assertEqual(vals, [b"value"])
|
||||
|
||||
del self.db[b"key"]
|
||||
keys, vals = self.db_content()
|
||||
self.assertEqual(keys, [])
|
||||
self.assertEqual(vals, [])
|
||||
|
||||
def test_readwrite_null_key(self):
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
self.db[None] = "value"
|
||||
|
||||
def test_readwrite_null_value(self):
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
self.db[b"key"] = None
|
||||
|
||||
|
||||
class Misuse(_SQLiteDbmTests):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.db = dbm_sqlite3.open(self.filename, "w")
|
||||
|
||||
def tearDown(self):
|
||||
self.db.close()
|
||||
super().tearDown()
|
||||
|
||||
def test_misuse_double_create(self):
|
||||
self.db["key"] = "value"
|
||||
with dbm_sqlite3.open(self.filename, "c") as db:
|
||||
self.assertEqual(db[b"key"], b"value")
|
||||
|
||||
def test_misuse_double_close(self):
|
||||
self.db.close()
|
||||
|
||||
def test_misuse_invalid_flag(self):
|
||||
regex = "must be.*'r'.*'w'.*'c'.*'n', not 'invalid'"
|
||||
with self.assertRaisesRegex(ValueError, regex):
|
||||
dbm_sqlite3.open(self.filename, flag="invalid")
|
||||
|
||||
def test_misuse_double_delete(self):
|
||||
self.db["key"] = "value"
|
||||
del self.db[b"key"]
|
||||
with self.assertRaises(KeyError):
|
||||
del self.db[b"key"]
|
||||
|
||||
def test_misuse_invalid_key(self):
|
||||
with self.assertRaises(KeyError):
|
||||
self.db[b"key"]
|
||||
|
||||
def test_misuse_iter_close1(self):
|
||||
self.db["1"] = 1
|
||||
it = iter(self.db)
|
||||
self.db.close()
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
next(it)
|
||||
|
||||
def test_misuse_iter_close2(self):
|
||||
self.db["1"] = 1
|
||||
self.db["2"] = 2
|
||||
it = iter(self.db)
|
||||
next(it)
|
||||
self.db.close()
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
next(it)
|
||||
|
||||
def test_misuse_use_after_close(self):
|
||||
self.db.close()
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
self.db[b"read"]
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
self.db[b"write"] = "value"
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
del self.db[b"del"]
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
len(self.db)
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
self.db.keys()
|
||||
|
||||
def test_misuse_reinit(self):
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
self.db.__init__("new.db", flag="n", mode=0o666)
|
||||
|
||||
def test_misuse_empty_filename(self):
|
||||
for flag in "r", "w", "c", "n":
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
db = dbm_sqlite3.open("", flag="c")
|
||||
|
||||
|
||||
class DataTypes(_SQLiteDbmTests):
|
||||
|
||||
dataset = (
|
||||
# (raw, coerced)
|
||||
(42, b"42"),
|
||||
(3.14, b"3.14"),
|
||||
("string", b"string"),
|
||||
(b"bytes", b"bytes"),
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.db = dbm_sqlite3.open(self.filename, "w")
|
||||
|
||||
def tearDown(self):
|
||||
self.db.close()
|
||||
super().tearDown()
|
||||
|
||||
def test_datatypes_values(self):
|
||||
for raw, coerced in self.dataset:
|
||||
with self.subTest(raw=raw, coerced=coerced):
|
||||
self.db["key"] = raw
|
||||
self.assertEqual(self.db[b"key"], coerced)
|
||||
|
||||
def test_datatypes_keys(self):
|
||||
for raw, coerced in self.dataset:
|
||||
with self.subTest(raw=raw, coerced=coerced):
|
||||
self.db[raw] = "value"
|
||||
self.assertEqual(self.db[coerced], b"value")
|
||||
# Raw keys are silently coerced to bytes.
|
||||
self.assertEqual(self.db[raw], b"value")
|
||||
del self.db[raw]
|
||||
|
||||
def test_datatypes_replace_coerced(self):
|
||||
self.db["10"] = "value"
|
||||
self.db[b"10"] = "value"
|
||||
self.db[10] = "value"
|
||||
self.assertEqual(self.db.keys(), [b"10"])
|
||||
|
||||
|
||||
class CorruptDatabase(_SQLiteDbmTests):
|
||||
"""Verify that database exceptions are raised as dbm.sqlite3.error."""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
with closing(sqlite3.connect(self.filename)) as cx:
|
||||
with cx:
|
||||
cx.execute("DROP TABLE IF EXISTS Dict")
|
||||
cx.execute("CREATE TABLE Dict (invalid_schema)")
|
||||
|
||||
def check(self, flag, fn, should_succeed=False):
|
||||
with closing(dbm_sqlite3.open(self.filename, flag)) as db:
|
||||
with self.assertRaises(dbm_sqlite3.error):
|
||||
fn(db)
|
||||
|
||||
@staticmethod
|
||||
def read(db):
|
||||
return db["key"]
|
||||
|
||||
@staticmethod
|
||||
def write(db):
|
||||
db["key"] = "value"
|
||||
|
||||
@staticmethod
|
||||
def iter(db):
|
||||
next(iter(db))
|
||||
|
||||
@staticmethod
|
||||
def keys(db):
|
||||
db.keys()
|
||||
|
||||
@staticmethod
|
||||
def del_(db):
|
||||
del db["key"]
|
||||
|
||||
@staticmethod
|
||||
def len_(db):
|
||||
len(db)
|
||||
|
||||
def test_corrupt_readwrite(self):
|
||||
for flag in "r", "w", "c":
|
||||
with self.subTest(flag=flag):
|
||||
check = partial(self.check, flag=flag)
|
||||
check(fn=self.read)
|
||||
check(fn=self.write)
|
||||
check(fn=self.iter)
|
||||
check(fn=self.keys)
|
||||
check(fn=self.del_)
|
||||
check(fn=self.len_)
|
||||
|
||||
def test_corrupt_force_new(self):
|
||||
with closing(dbm_sqlite3.open(self.filename, "n")) as db:
|
||||
db["foo"] = "write"
|
||||
_ = db[b"foo"]
|
||||
next(iter(db))
|
||||
del db[b"foo"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -0,0 +1,2 @@
|
||||
Add :mod:`dbm.sqlite3` as a backend to :mod:`dbm`.
|
||||
Patch by Raymond Hettinger and Erlend E. Aasland.
|
Loading…
Reference in New Issue
Block a user