# Adapted from test_file.py by Daniel Stutzbach import sys import os import io import errno import unittest from array import array from weakref import proxy from functools import wraps from test.support import ( cpython_only, swap_attr, gc_collect, is_emscripten, is_wasi, infinite_recursion, strace_helper ) from test.support.os_helper import ( TESTFN, TESTFN_ASCII, TESTFN_UNICODE, make_bad_fd, ) from test.support.warnings_helper import check_warnings from test.support.import_helper import import_module from collections import UserList import _io # C implementation of io import _pyio # Python implementation of io _strace_flags=["--trace=%file,%desc"] class AutoFileTests: # file tests for which a test file is automatically set up def setUp(self): self.f = self.FileIO(TESTFN, 'w') def tearDown(self): if self.f: self.f.close() os.remove(TESTFN) def testWeakRefs(self): # verify weak references p = proxy(self.f) p.write(bytes(range(10))) self.assertEqual(self.f.tell(), p.tell()) self.f.close() self.f = None gc_collect() # For PyPy or other GCs. self.assertRaises(ReferenceError, getattr, p, 'tell') def testSeekTell(self): self.f.write(bytes(range(20))) self.assertEqual(self.f.tell(), 20) self.f.seek(0) self.assertEqual(self.f.tell(), 0) self.f.seek(10) self.assertEqual(self.f.tell(), 10) self.f.seek(5, 1) self.assertEqual(self.f.tell(), 15) self.f.seek(-5, 1) self.assertEqual(self.f.tell(), 10) self.f.seek(-5, 2) self.assertEqual(self.f.tell(), 15) def testAttributes(self): # verify expected attributes exist f = self.f self.assertEqual(f.mode, "wb") self.assertEqual(f.closed, False) # verify the attributes are readonly for attr in 'mode', 'closed': self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops') @unittest.skipIf(is_wasi, "WASI does not expose st_blksize.") def testBlksize(self): # test private _blksize attribute blksize = io.DEFAULT_BUFFER_SIZE # try to get preferred blksize from stat.st_blksize, if available if hasattr(os, 'fstat'): fst = os.fstat(self.f.fileno()) blksize = getattr(fst, 'st_blksize', blksize) self.assertEqual(self.f._blksize, blksize) # verify readinto def testReadintoByteArray(self): self.f.write(bytes([1, 2, 0, 255])) self.f.close() ba = bytearray(b'abcdefgh') with self.FileIO(TESTFN, 'r') as f: n = f.readinto(ba) self.assertEqual(ba, b'\x01\x02\x00\xffefgh') self.assertEqual(n, 4) def _testReadintoMemoryview(self): self.f.write(bytes([1, 2, 0, 255])) self.f.close() m = memoryview(bytearray(b'abcdefgh')) with self.FileIO(TESTFN, 'r') as f: n = f.readinto(m) self.assertEqual(m, b'\x01\x02\x00\xffefgh') self.assertEqual(n, 4) m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2]) with self.FileIO(TESTFN, 'r') as f: n = f.readinto(m) self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh') self.assertEqual(n, 4) def _testReadintoArray(self): self.f.write(bytes([1, 2, 0, 255])) self.f.close() a = array('B', b'abcdefgh') with self.FileIO(TESTFN, 'r') as f: n = f.readinto(a) self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104])) self.assertEqual(n, 4) a = array('b', b'abcdefgh') with self.FileIO(TESTFN, 'r') as f: n = f.readinto(a) self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104])) self.assertEqual(n, 4) a = array('I', b'abcdefgh') with self.FileIO(TESTFN, 'r') as f: n = f.readinto(a) self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh')) self.assertEqual(n, 4) def testWritelinesList(self): l = [b'123', b'456'] self.f.writelines(l) self.f.close() self.f = self.FileIO(TESTFN, 'rb') buf = self.f.read() self.assertEqual(buf, b'123456') def testWritelinesUserList(self): l = UserList([b'123', b'456']) self.f.writelines(l) self.f.close() self.f = self.FileIO(TESTFN, 'rb') buf = self.f.read() self.assertEqual(buf, b'123456') def testWritelinesError(self): self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) self.assertRaises(TypeError, self.f.writelines, None) self.assertRaises(TypeError, self.f.writelines, "abc") def test_none_args(self): self.f.write(b"hi\nbye\nabc") self.f.close() self.f = self.FileIO(TESTFN, 'r') self.assertEqual(self.f.read(None), b"hi\nbye\nabc") self.f.seek(0) self.assertEqual(self.f.readline(None), b"hi\n") self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) def test_reject(self): self.assertRaises(TypeError, self.f.write, "Hello!") def testRepr(self): self.assertEqual(repr(self.f), "<%s.FileIO name=%r mode=%r closefd=True>" % (self.modulename, self.f.name, self.f.mode)) del self.f.name self.assertEqual(repr(self.f), "<%s.FileIO fd=%r mode=%r closefd=True>" % (self.modulename, self.f.fileno(), self.f.mode)) self.f.close() self.assertEqual(repr(self.f), "<%s.FileIO [closed]>" % (self.modulename,)) def test_subclass_repr(self): class TestSubclass(self.FileIO): pass f = TestSubclass(TESTFN) with f: self.assertIn(TestSubclass.__name__, repr(f)) self.assertIn(TestSubclass.__name__, repr(f)) def testReprNoCloseFD(self): fd = os.open(TESTFN, os.O_RDONLY) try: with self.FileIO(fd, 'r', closefd=False) as f: self.assertEqual(repr(f), "<%s.FileIO name=%r mode=%r closefd=False>" % (self.modulename, f.name, f.mode)) finally: os.close(fd) @infinite_recursion(25) def testRecursiveRepr(self): # Issue #25455 with swap_attr(self.f, 'name', self.f): with self.assertRaises(RuntimeError): repr(self.f) # Should not crash def testErrors(self): f = self.f self.assertFalse(f.isatty()) self.assertFalse(f.closed) #self.assertEqual(f.name, TESTFN) self.assertRaises(ValueError, f.read, 10) # Open for reading f.close() self.assertTrue(f.closed) f = self.FileIO(TESTFN, 'r') self.assertRaises(TypeError, f.readinto, "") self.assertFalse(f.closed) f.close() self.assertTrue(f.closed) def testMethods(self): methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable', 'read', 'readall', 'readline', 'readlines', 'tell', 'truncate', 'flush'] self.f.close() self.assertTrue(self.f.closed) for methodname in methods: method = getattr(self.f, methodname) # should raise on closed file self.assertRaises(ValueError, method) self.assertRaises(TypeError, self.f.readinto) self.assertRaises(ValueError, self.f.readinto, bytearray(1)) self.assertRaises(TypeError, self.f.seek) self.assertRaises(ValueError, self.f.seek, 0) self.assertRaises(TypeError, self.f.write) self.assertRaises(ValueError, self.f.write, b'') self.assertRaises(TypeError, self.f.writelines) self.assertRaises(ValueError, self.f.writelines, b'') def testOpendir(self): # Issue 3703: opening a directory should fill the errno # Windows always returns "[Errno 13]: Permission denied # Unix uses fstat and returns "[Errno 21]: Is a directory" try: self.FileIO('.', 'r') except OSError as e: self.assertNotEqual(e.errno, 0) self.assertEqual(e.filename, ".") else: self.fail("Should have raised OSError") @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") def testOpenDirFD(self): fd = os.open('.', os.O_RDONLY) with self.assertRaises(OSError) as cm: self.FileIO(fd, 'r') os.close(fd) self.assertEqual(cm.exception.errno, errno.EISDIR) #A set of functions testing that we get expected behaviour if someone has #manually closed the internal file descriptor. First, a decorator: def ClosedFD(func): @wraps(func) def wrapper(self): #forcibly close the fd before invoking the problem function f = self.f os.close(f.fileno()) try: func(self, f) finally: try: self.f.close() except OSError: pass return wrapper def ClosedFDRaises(func): @wraps(func) def wrapper(self): #forcibly close the fd before invoking the problem function f = self.f os.close(f.fileno()) try: func(self, f) except OSError as e: self.assertEqual(e.errno, errno.EBADF) else: self.fail("Should have raised OSError") finally: try: self.f.close() except OSError: pass return wrapper @ClosedFDRaises def testErrnoOnClose(self, f): f.close() @ClosedFDRaises def testErrnoOnClosedWrite(self, f): f.write(b'a') @ClosedFDRaises def testErrnoOnClosedSeek(self, f): f.seek(0) @ClosedFDRaises def testErrnoOnClosedTell(self, f): f.tell() @ClosedFDRaises def testErrnoOnClosedTruncate(self, f): f.truncate(0) @ClosedFD def testErrnoOnClosedSeekable(self, f): f.seekable() @ClosedFD def testErrnoOnClosedReadable(self, f): f.readable() @ClosedFD def testErrnoOnClosedWritable(self, f): f.writable() @ClosedFD def testErrnoOnClosedFileno(self, f): f.fileno() @ClosedFD def testErrnoOnClosedIsatty(self, f): self.assertEqual(f.isatty(), False) def ReopenForRead(self): try: self.f.close() except OSError: pass self.f = self.FileIO(TESTFN, 'r') os.close(self.f.fileno()) return self.f @ClosedFDRaises def testErrnoOnClosedRead(self, f): f = self.ReopenForRead() f.read(1) @ClosedFDRaises def testErrnoOnClosedReadall(self, f): f = self.ReopenForRead() f.readall() @ClosedFDRaises def testErrnoOnClosedReadinto(self, f): f = self.ReopenForRead() a = array('b', b'x'*10) f.readinto(a) @strace_helper.requires_strace() def test_syscalls_read(self): """Check that the set of system calls produced by the I/O stack is what is expected for various read cases. It's expected as bits of the I/O implementation change, this will need to change. The goal is to catch changes that unintentionally add additional systemcalls (ex. additional calls have been looked at in bpo-21679 and gh-120754). """ self.f.write(b"Hello, World!") self.f.close() def check_readall(name, code, prelude="", cleanup="", extra_checks=None): with self.subTest(name=name): syscalls = strace_helper.get_events(code, _strace_flags, prelude=prelude, cleanup=cleanup) # The first call should be an open that returns a # file descriptor (fd). Afer that calls may vary. Once the file # is opened, check calls refer to it by fd as the filename # could be removed from the filesystem, renamed, etc. See: # Time-of-check time-of-use (TOCTOU) software bug class. # # There are a number of related but distinct open system calls # so not checking precise name here. self.assertGreater( len(syscalls), 1, f"Should have had at least an open call|calls={syscalls}") fd_str = syscalls[0].returncode # All other calls should contain the fd in their argument set. for ev in syscalls[1:]: self.assertIn( fd_str, ev.args, f"Looking for file descriptor in arguments|ev={ev}" ) # There are a number of related syscalls used to implement # behaviors in a libc (ex. fstat, newfstatat, statx, open, openat). # Allow any that use the same substring. def count_similarname(name): return len([ev for ev in syscalls if name in ev.syscall]) checks = [ # Should open and close the file exactly once ("open", 1), ("close", 1), # There should no longer be an isatty call (All files being # tested are block devices / not character devices). ('ioctl', 0), # Should only have one fstat (bpo-21679, gh-120754) # note: It's important this uses a fd rather than filename, # That is validated by the `fd` check above. # note: fstat, newfstatat, and statx have all been observed # here in the underlying C library implementations. ("stat", 1) ] if extra_checks: checks += extra_checks for call, count in checks: self.assertEqual( count_similarname(call), count, msg=f"call={call}|count={count}|syscalls={syscalls}" ) # "open, read, close" file using different common patterns. check_readall( "open builtin with default options", f""" f = open('{TESTFN}') f.read() f.close() """ ) check_readall( "open in binary mode", f""" f = open('{TESTFN}', 'rb') f.read() f.close() """ ) check_readall( "open in text mode", f""" f = open('{TESTFN}', 'rt') f.read() f.close() """, # GH-122111: read_text uses BufferedIO which requires looking up # position in file. `read_bytes` disables that buffering and avoids # these calls which is tested the `pathlib read_bytes` case. extra_checks=[("seek", 1)] ) check_readall( "pathlib read_bytes", "p.read_bytes()", prelude=f"""from pathlib import Path; p = Path("{TESTFN}")""", # GH-122111: Buffering is disabled so these calls are avoided. extra_checks=[("seek", 0)] ) check_readall( "pathlib read_text", "p.read_text()", prelude=f"""from pathlib import Path; p = Path("{TESTFN}")""" ) # Focus on just `read()`. calls = strace_helper.get_syscalls( prelude=f"f = open('{TESTFN}')", code="f.read()", cleanup="f.close()", strace_flags=_strace_flags ) # One to read all the bytes # One to read the EOF and get a size 0 return. self.assertEqual(calls.count("read"), 2) class CAutoFileTests(AutoFileTests, unittest.TestCase): FileIO = _io.FileIO modulename = '_io' class PyAutoFileTests(AutoFileTests, unittest.TestCase): FileIO = _pyio.FileIO modulename = '_pyio' class OtherFileTests: def testAbles(self): try: f = self.FileIO(TESTFN, "w") self.assertEqual(f.readable(), False) self.assertEqual(f.writable(), True) self.assertEqual(f.seekable(), True) f.close() f = self.FileIO(TESTFN, "r") self.assertEqual(f.readable(), True) self.assertEqual(f.writable(), False) self.assertEqual(f.seekable(), True) f.close() f = self.FileIO(TESTFN, "a+") self.assertEqual(f.readable(), True) self.assertEqual(f.writable(), True) self.assertEqual(f.seekable(), True) self.assertEqual(f.isatty(), False) f.close() if sys.platform != "win32" and not is_emscripten: try: f = self.FileIO("/dev/tty", "a") except OSError: # When run in a cron job there just aren't any # ttys, so skip the test. This also handles other # OS'es that don't support /dev/tty. pass else: self.assertEqual(f.readable(), False) self.assertEqual(f.writable(), True) if sys.platform != "darwin" and \ 'bsd' not in sys.platform and \ not sys.platform.startswith(('sunos', 'aix')): # Somehow /dev/tty appears seekable on some BSDs self.assertEqual(f.seekable(), False) self.assertEqual(f.isatty(), True) f.close() finally: os.unlink(TESTFN) def testInvalidModeStrings(self): # check invalid mode strings for mode in ("", "aU", "wU+", "rw", "rt"): try: f = self.FileIO(TESTFN, mode) except ValueError: pass else: f.close() self.fail('%r is an invalid file mode' % mode) def testModeStrings(self): # test that the mode attribute is correct for various mode strings # given as init args try: for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'), ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'), ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'), ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]: # read modes are last so that TESTFN will exist first with self.FileIO(TESTFN, modes[0]) as f: self.assertEqual(f.mode, modes[1]) finally: if os.path.exists(TESTFN): os.unlink(TESTFN) def testUnicodeOpen(self): # verify repr works for unicode too f = self.FileIO(str(TESTFN), "w") f.close() os.unlink(TESTFN) def testBytesOpen(self): # Opening a bytes filename fn = TESTFN_ASCII.encode("ascii") f = self.FileIO(fn, "w") try: f.write(b"abc") f.close() with open(TESTFN_ASCII, "rb") as f: self.assertEqual(f.read(), b"abc") finally: os.unlink(TESTFN_ASCII) @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8', "test only works for utf-8 filesystems") def testUtf8BytesOpen(self): # Opening a UTF-8 bytes filename try: fn = TESTFN_UNICODE.encode("utf-8") except UnicodeEncodeError: self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE) f = self.FileIO(fn, "w") try: f.write(b"abc") f.close() with open(TESTFN_UNICODE, "rb") as f: self.assertEqual(f.read(), b"abc") finally: os.unlink(TESTFN_UNICODE) def testConstructorHandlesNULChars(self): fn_with_NUL = 'foo\0bar' self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w') self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w') def testInvalidFd(self): self.assertRaises(ValueError, self.FileIO, -10) self.assertRaises(OSError, self.FileIO, make_bad_fd()) if sys.platform == 'win32': import msvcrt self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd()) def testBooleanFd(self): for fd in False, True: with self.assertWarnsRegex(RuntimeWarning, 'bool is used as a file descriptor') as cm: f = self.FileIO(fd, closefd=False) f.close() self.assertEqual(cm.filename, __file__) def testBadModeArgument(self): # verify that we get a sensible error message for bad mode argument bad_mode = "qwerty" try: f = self.FileIO(TESTFN, bad_mode) except ValueError as msg: if msg.args[0] != 0: s = str(msg) if TESTFN in s or bad_mode not in s: self.fail("bad error message for invalid mode: %s" % s) # if msg.args[0] == 0, we're probably on Windows where there may be # no obvious way to discover why open() failed. else: f.close() self.fail("no error for invalid mode: %s" % bad_mode) def testTruncate(self): f = self.FileIO(TESTFN, 'w') f.write(bytes(bytearray(range(10)))) self.assertEqual(f.tell(), 10) f.truncate(5) self.assertEqual(f.tell(), 10) self.assertEqual(f.seek(0, io.SEEK_END), 5) f.truncate(15) self.assertEqual(f.tell(), 5) self.assertEqual(f.seek(0, io.SEEK_END), 15) f.close() def testTruncateOnWindows(self): def bug801631(): # SF bug # "file.truncate fault on windows" f = self.FileIO(TESTFN, 'w') f.write(bytes(range(11))) f.close() f = self.FileIO(TESTFN,'r+') data = f.read(5) if data != bytes(range(5)): self.fail("Read on file opened for update failed %r" % data) if f.tell() != 5: self.fail("File pos after read wrong %d" % f.tell()) f.truncate() if f.tell() != 5: self.fail("File pos after ftruncate wrong %d" % f.tell()) f.close() size = os.path.getsize(TESTFN) if size != 5: self.fail("File size after ftruncate wrong %d" % size) try: bug801631() finally: os.unlink(TESTFN) def testAppend(self): try: f = open(TESTFN, 'wb') f.write(b'spam') f.close() f = open(TESTFN, 'ab') f.write(b'eggs') f.close() f = open(TESTFN, 'rb') d = f.read() f.close() self.assertEqual(d, b'spameggs') finally: try: os.unlink(TESTFN) except: pass def testInvalidInit(self): self.assertRaises(TypeError, self.FileIO, "1", 0, 0) def testWarnings(self): with check_warnings(quiet=True) as w: self.assertEqual(w.warnings, []) self.assertRaises(TypeError, self.FileIO, []) self.assertEqual(w.warnings, []) self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt") self.assertEqual(w.warnings, []) def testUnclosedFDOnException(self): class MyException(Exception): pass class MyFileIO(self.FileIO): def __setattr__(self, name, value): if name == "name": raise MyException("blocked setting name") return super(MyFileIO, self).__setattr__(name, value) fd = os.open(__file__, os.O_RDONLY) self.assertRaises(MyException, MyFileIO, fd) os.close(fd) # should not raise OSError(EBADF) class COtherFileTests(OtherFileTests, unittest.TestCase): FileIO = _io.FileIO modulename = '_io' @cpython_only def testInvalidFd_overflow(self): # Issue 15989 _testcapi = import_module("_testcapi") self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1) self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1) def test_open_code(self): # Check that the default behaviour of open_code matches # open("rb") with self.FileIO(__file__, "rb") as f: expected = f.read() with _io.open_code(__file__) as f: actual = f.read() self.assertEqual(expected, actual) class PyOtherFileTests(OtherFileTests, unittest.TestCase): FileIO = _pyio.FileIO modulename = '_pyio' def test_open_code(self): # Check that the default behaviour of open_code matches # open("rb") with self.FileIO(__file__, "rb") as f: expected = f.read() with check_warnings(quiet=True) as w: # Always test _open_code_with_warning with _pyio._open_code_with_warning(__file__) as f: actual = f.read() self.assertEqual(expected, actual) self.assertNotEqual(w.warnings, []) def tearDownModule(): # Historically, these tests have been sloppy about removing TESTFN. # So get rid of it no matter what. if os.path.exists(TESTFN): os.unlink(TESTFN) if __name__ == '__main__': unittest.main()