from collections import namedtuple import contextlib import sys from textwrap import dedent import threading import time import unittest from test.support import import_helper _channels = import_helper.import_module('_interpchannels') from test.support.interpreters import _crossinterp from test.test__interpreters import ( _interpreters, _run_output, clean_up_interpreters, ) REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND] # Additional tests are found in Lib/test/test_interpreters/test_channels.py. # New tests should be added there. # XXX The tests here should be moved there. See the note under LowLevelTests. ################################## # helpers def recv_wait(cid): while True: try: obj, unboundop = _channels.recv(cid) except _channels.ChannelEmptyError: time.sleep(0.1) else: assert unboundop is None, repr(unboundop) return obj def recv_nowait(cid, *args, unbound=False): obj, unboundop = _channels.recv(cid, *args) assert (unboundop is None) != unbound, repr(unboundop) return obj #@contextmanager #def run_threaded(id, source, **shared): # def run(): # run_interp(id, source, **shared) # t = threading.Thread(target=run) # t.start() # yield # t.join() def run_interp(id, source, **shared): _run_interp(id, source, shared) def _run_interp(id, source, shared, _mainns={}): source = dedent(source) main, *_ = _interpreters.get_main() if main == id: cur, *_ = _interpreters.get_current() if cur != main: raise RuntimeError # XXX Run a func? exec(source, _mainns) else: _interpreters.run_string(id, source, shared) class Interpreter(namedtuple('Interpreter', 'name id')): @classmethod def from_raw(cls, raw): if isinstance(raw, cls): return raw elif isinstance(raw, str): return cls(raw) else: raise NotImplementedError def __new__(cls, name=None, id=None): main, *_ = _interpreters.get_main() if id == main: if not name: name = 'main' elif name != 'main': raise ValueError( 'name mismatch (expected "main", got "{}")'.format(name)) id = main elif id is not None: if not name: name = 'interp' elif name == 'main': raise ValueError('name mismatch (unexpected "main")') assert isinstance(id, int), repr(id) elif not name or name == 'main': name = 'main' id = main else: id = _interpreters.create() self = super().__new__(cls, name, id) return self # XXX expect_channel_closed() is unnecessary once we improve exc propagation. @contextlib.contextmanager def expect_channel_closed(): try: yield except _channels.ChannelClosedError: pass else: assert False, 'channel not closed' class ChannelAction(namedtuple('ChannelAction', 'action end interp')): def __new__(cls, action, end=None, interp=None): if not end: end = 'both' if not interp: interp = 'main' self = super().__new__(cls, action, end, interp) return self def __init__(self, *args, **kwargs): if self.action == 'use': if self.end not in ('same', 'opposite', 'send', 'recv'): raise ValueError(self.end) elif self.action in ('close', 'force-close'): if self.end not in ('both', 'same', 'opposite', 'send', 'recv'): raise ValueError(self.end) else: raise ValueError(self.action) if self.interp not in ('main', 'same', 'other', 'extra'): raise ValueError(self.interp) def resolve_end(self, end): if self.end == 'same': return end elif self.end == 'opposite': return 'recv' if end == 'send' else 'send' else: return self.end def resolve_interp(self, interp, other, extra): if self.interp == 'same': return interp elif self.interp == 'other': if other is None: raise RuntimeError return other elif self.interp == 'extra': if extra is None: raise RuntimeError return extra elif self.interp == 'main': if interp.name == 'main': return interp elif other and other.name == 'main': return other else: raise RuntimeError # Per __init__(), there aren't any others. class ChannelState(namedtuple('ChannelState', 'pending closed')): def __new__(cls, pending=0, *, closed=False): self = super().__new__(cls, pending, closed) return self def incr(self): return type(self)(self.pending + 1, closed=self.closed) def decr(self): return type(self)(self.pending - 1, closed=self.closed) def close(self, *, force=True): if self.closed: if not force or self.pending == 0: return self return type(self)(0 if force else self.pending, closed=True) def run_action(cid, action, end, state, *, hideclosed=True): if state.closed: if action == 'use' and end == 'recv' and state.pending: expectfail = False else: expectfail = True else: expectfail = False try: result = _run_action(cid, action, end, state) except _channels.ChannelClosedError: if not hideclosed and not expectfail: raise result = state.close() else: if expectfail: raise ... # XXX return result def _run_action(cid, action, end, state): if action == 'use': if end == 'send': _channels.send(cid, b'spam', blocking=False) return state.incr() elif end == 'recv': if not state.pending: try: _channels.recv(cid) except _channels.ChannelEmptyError: return state else: raise Exception('expected ChannelEmptyError') else: recv_nowait(cid) return state.decr() else: raise ValueError(end) elif action == 'close': kwargs = {} if end in ('recv', 'send'): kwargs[end] = True _channels.close(cid, **kwargs) return state.close() elif action == 'force-close': kwargs = { 'force': True, } if end in ('recv', 'send'): kwargs[end] = True _channels.close(cid, **kwargs) return state.close(force=True) else: raise ValueError(action) def clean_up_channels(): for cid, _ in _channels.list_all(): try: _channels.destroy(cid) except _channels.ChannelNotFoundError: pass # already destroyed class TestBase(unittest.TestCase): def tearDown(self): clean_up_channels() clean_up_interpreters() ################################## # channel tests class ChannelIDTests(TestBase): def test_default_kwargs(self): cid = _channels._channel_id(10, force=True) self.assertEqual(int(cid), 10) self.assertEqual(cid.end, 'both') def test_with_kwargs(self): cid = _channels._channel_id(10, send=True, force=True) self.assertEqual(cid.end, 'send') cid = _channels._channel_id(10, send=True, recv=False, force=True) self.assertEqual(cid.end, 'send') cid = _channels._channel_id(10, recv=True, force=True) self.assertEqual(cid.end, 'recv') cid = _channels._channel_id(10, recv=True, send=False, force=True) self.assertEqual(cid.end, 'recv') cid = _channels._channel_id(10, send=True, recv=True, force=True) self.assertEqual(cid.end, 'both') def test_coerce_id(self): class Int(str): def __index__(self): return 10 cid = _channels._channel_id(Int(), force=True) self.assertEqual(int(cid), 10) def test_bad_id(self): self.assertRaises(TypeError, _channels._channel_id, object()) self.assertRaises(TypeError, _channels._channel_id, 10.0) self.assertRaises(TypeError, _channels._channel_id, '10') self.assertRaises(TypeError, _channels._channel_id, b'10') self.assertRaises(ValueError, _channels._channel_id, -1) self.assertRaises(OverflowError, _channels._channel_id, 2**64) def test_bad_kwargs(self): with self.assertRaises(ValueError): _channels._channel_id(10, send=False, recv=False) def test_does_not_exist(self): cid = _channels.create(REPLACE) with self.assertRaises(_channels.ChannelNotFoundError): _channels._channel_id(int(cid) + 1) # unforced def test_str(self): cid = _channels._channel_id(10, force=True) self.assertEqual(str(cid), '10') def test_repr(self): cid = _channels._channel_id(10, force=True) self.assertEqual(repr(cid), 'ChannelID(10)') cid = _channels._channel_id(10, send=True, force=True) self.assertEqual(repr(cid), 'ChannelID(10, send=True)') cid = _channels._channel_id(10, recv=True, force=True) self.assertEqual(repr(cid), 'ChannelID(10, recv=True)') cid = _channels._channel_id(10, send=True, recv=True, force=True) self.assertEqual(repr(cid), 'ChannelID(10)') def test_equality(self): cid1 = _channels.create(REPLACE) cid2 = _channels._channel_id(int(cid1)) cid3 = _channels.create(REPLACE) self.assertTrue(cid1 == cid1) self.assertTrue(cid1 == cid2) self.assertTrue(cid1 == int(cid1)) self.assertTrue(int(cid1) == cid1) self.assertTrue(cid1 == float(int(cid1))) self.assertTrue(float(int(cid1)) == cid1) self.assertFalse(cid1 == float(int(cid1)) + 0.1) self.assertFalse(cid1 == str(int(cid1))) self.assertFalse(cid1 == 2**1000) self.assertFalse(cid1 == float('inf')) self.assertFalse(cid1 == 'spam') self.assertFalse(cid1 == cid3) self.assertFalse(cid1 != cid1) self.assertFalse(cid1 != cid2) self.assertTrue(cid1 != cid3) def test_shareable(self): chan = _channels.create(REPLACE) obj = _channels.create(REPLACE) _channels.send(chan, obj, blocking=False) got = recv_nowait(chan) self.assertEqual(got, obj) self.assertIs(type(got), type(obj)) # XXX Check the following in the channel tests? #self.assertIsNot(got, obj) class ChannelTests(TestBase): def test_create_cid(self): cid = _channels.create(REPLACE) self.assertIsInstance(cid, _channels.ChannelID) def test_sequential_ids(self): before = [cid for cid, _ in _channels.list_all()] id1 = _channels.create(REPLACE) id2 = _channels.create(REPLACE) id3 = _channels.create(REPLACE) after = [cid for cid, _ in _channels.list_all()] self.assertEqual(id2, int(id1) + 1) self.assertEqual(id3, int(id2) + 1) self.assertEqual(set(after) - set(before), {id1, id2, id3}) def test_ids_global(self): id1 = _interpreters.create() out = _run_output(id1, dedent(""" import _interpchannels as _channels cid = _channels.create(3) print(cid) """)) cid1 = int(out.strip()) id2 = _interpreters.create() out = _run_output(id2, dedent(""" import _interpchannels as _channels cid = _channels.create(3) print(cid) """)) cid2 = int(out.strip()) self.assertEqual(cid2, int(cid1) + 1) def test_channel_list_interpreters_none(self): """Test listing interpreters for a channel with no associations.""" # Test for channel with no associated _interpreters. cid = _channels.create(REPLACE) send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(send_interps, []) self.assertEqual(recv_interps, []) def test_channel_list_interpreters_basic(self): """Test basic listing channel _interpreters.""" interp0, *_ = _interpreters.get_main() cid = _channels.create(REPLACE) _channels.send(cid, "send", blocking=False) # Test for a channel that has one end associated to an interpreter. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(send_interps, [interp0]) self.assertEqual(recv_interps, []) interp1 = _interpreters.create() _run_output(interp1, dedent(f""" import _interpchannels as _channels _channels.recv({cid}) """)) # Test for channel that has both ends associated to an interpreter. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(send_interps, [interp0]) self.assertEqual(recv_interps, [interp1]) def test_channel_list_interpreters_multiple(self): """Test listing interpreters for a channel with many associations.""" interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() interp2 = _interpreters.create() interp3 = _interpreters.create() cid = _channels.create(REPLACE) _channels.send(cid, "send", blocking=False) _run_output(interp1, dedent(f""" import _interpchannels as _channels _channels.send({cid}, "send", blocking=False) """)) _run_output(interp2, dedent(f""" import _interpchannels as _channels _channels.recv({cid}) """)) _run_output(interp3, dedent(f""" import _interpchannels as _channels _channels.recv({cid}) """)) send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(set(send_interps), {interp0, interp1}) self.assertEqual(set(recv_interps), {interp2, interp3}) def test_channel_list_interpreters_destroyed(self): """Test listing channel interpreters with a destroyed interpreter.""" interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() cid = _channels.create(REPLACE) _channels.send(cid, "send", blocking=False) _run_output(interp1, dedent(f""" import _interpchannels as _channels _channels.recv({cid}) """)) # Should be one interpreter associated with each end. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(send_interps, [interp0]) self.assertEqual(recv_interps, [interp1]) _interpreters.destroy(interp1) # Destroyed interpreter should not be listed. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(send_interps, [interp0]) self.assertEqual(recv_interps, []) def test_channel_list_interpreters_released(self): """Test listing channel interpreters with a released channel.""" # Set up one channel with main interpreter on the send end and two # subinterpreters on the receive end. interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() interp2 = _interpreters.create() cid = _channels.create(REPLACE) _channels.send(cid, "data", blocking=False) _run_output(interp1, dedent(f""" import _interpchannels as _channels _channels.recv({cid}) """)) _channels.send(cid, "data", blocking=False) _run_output(interp2, dedent(f""" import _interpchannels as _channels _channels.recv({cid}) """)) # Check the setup. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(len(send_interps), 1) self.assertEqual(len(recv_interps), 2) # Release the main interpreter from the send end. _channels.release(cid, send=True) # Send end should have no associated _interpreters. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(len(send_interps), 0) self.assertEqual(len(recv_interps), 2) # Release one of the subinterpreters from the receive end. _run_output(interp2, dedent(f""" import _interpchannels as _channels _channels.release({cid}) """)) # Receive end should have the released interpreter removed. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(len(send_interps), 0) self.assertEqual(recv_interps, [interp1]) def test_channel_list_interpreters_closed(self): """Test listing channel interpreters with a closed channel.""" interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() cid = _channels.create(REPLACE) # Put something in the channel so that it's not empty. _channels.send(cid, "send", blocking=False) # Check initial state. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(len(send_interps), 1) self.assertEqual(len(recv_interps), 0) # Force close the channel. _channels.close(cid, force=True) # Both ends should raise an error. with self.assertRaises(_channels.ChannelClosedError): _channels.list_interpreters(cid, send=True) with self.assertRaises(_channels.ChannelClosedError): _channels.list_interpreters(cid, send=False) def test_channel_list_interpreters_closed_send_end(self): """Test listing channel interpreters with a channel's send end closed.""" interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() cid = _channels.create(REPLACE) # Put something in the channel so that it's not empty. _channels.send(cid, "send", blocking=False) # Check initial state. send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(len(send_interps), 1) self.assertEqual(len(recv_interps), 0) # Close the send end of the channel. _channels.close(cid, send=True) # Send end should raise an error. with self.assertRaises(_channels.ChannelClosedError): _channels.list_interpreters(cid, send=True) # Receive end should not be closed (since channel is not empty). recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(len(recv_interps), 0) # Close the receive end of the channel from a subinterpreter. _run_output(interp1, dedent(f""" import _interpchannels as _channels _channels.close({cid}, force=True) """)) return # Both ends should raise an error. with self.assertRaises(_channels.ChannelClosedError): _channels.list_interpreters(cid, send=True) with self.assertRaises(_channels.ChannelClosedError): _channels.list_interpreters(cid, send=False) def test_allowed_types(self): cid = _channels.create(REPLACE) objects = [ None, 'spam', b'spam', 42, ] for obj in objects: with self.subTest(obj): _channels.send(cid, obj, blocking=False) got = recv_nowait(cid) self.assertEqual(got, obj) self.assertIs(type(got), type(obj)) # XXX Check the following? #self.assertIsNot(got, obj) # XXX What about between interpreters? def test_run_string_arg_unresolved(self): cid = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.set___main___attrs(interp, dict(cid=cid.send)) out = _run_output(interp, dedent(""" import _interpchannels as _channels print(cid.end) _channels.send(cid, b'spam', blocking=False) """)) obj = recv_nowait(cid) self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') # XXX For now there is no high-level channel into which the # sent channel ID can be converted... # Note: this test caused crashes on some buildbots (bpo-33615). @unittest.skip('disabled until high-level channels exist') def test_run_string_arg_resolved(self): cid = _channels.create(REPLACE) cid = _channels._channel_id(cid, _resolve=True) interp = _interpreters.create() out = _run_output(interp, dedent(""" import _interpchannels as _channels print(chan.id.end) _channels.send(chan.id, b'spam', blocking=False) """), dict(chan=cid.send)) obj = recv_nowait(cid) self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') #------------------- # send/recv def test_send_recv_main(self): cid = _channels.create(REPLACE) orig = b'spam' _channels.send(cid, orig, blocking=False) obj = recv_nowait(cid) self.assertEqual(obj, orig) self.assertIsNot(obj, orig) def test_send_recv_same_interpreter(self): id1 = _interpreters.create() out = _run_output(id1, dedent(""" import _interpchannels as _channels cid = _channels.create(REPLACE) orig = b'spam' _channels.send(cid, orig, blocking=False) obj, _ = _channels.recv(cid) assert obj is not orig assert obj == orig """)) def test_send_recv_different_interpreters(self): cid = _channels.create(REPLACE) id1 = _interpreters.create() out = _run_output(id1, dedent(f""" import _interpchannels as _channels _channels.send({cid}, b'spam', blocking=False) """)) obj = recv_nowait(cid) self.assertEqual(obj, b'spam') def test_send_recv_different_threads(self): cid = _channels.create(REPLACE) def f(): obj = recv_wait(cid) _channels.send(cid, obj) t = threading.Thread(target=f) t.start() _channels.send(cid, b'spam') obj = recv_wait(cid) t.join() self.assertEqual(obj, b'spam') def test_send_recv_different_interpreters_and_threads(self): cid = _channels.create(REPLACE) id1 = _interpreters.create() out = None def f(): nonlocal out out = _run_output(id1, dedent(f""" import time import _interpchannels as _channels while True: try: obj, _ = _channels.recv({cid}) break except _channels.ChannelEmptyError: time.sleep(0.1) assert(obj == b'spam') _channels.send({cid}, b'eggs') """)) t = threading.Thread(target=f) t.start() _channels.send(cid, b'spam') obj = recv_wait(cid) t.join() self.assertEqual(obj, b'eggs') def test_send_not_found(self): with self.assertRaises(_channels.ChannelNotFoundError): _channels.send(10, b'spam') def test_recv_not_found(self): with self.assertRaises(_channels.ChannelNotFoundError): _channels.recv(10) def test_recv_empty(self): cid = _channels.create(REPLACE) with self.assertRaises(_channels.ChannelEmptyError): _channels.recv(cid) def test_recv_default(self): default = object() cid = _channels.create(REPLACE) obj1 = recv_nowait(cid, default) _channels.send(cid, None, blocking=False) _channels.send(cid, 1, blocking=False) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'eggs', blocking=False) obj2 = recv_nowait(cid, default) obj3 = recv_nowait(cid, default) obj4 = recv_nowait(cid) obj5 = recv_nowait(cid, default) obj6 = recv_nowait(cid, default) self.assertIs(obj1, default) self.assertIs(obj2, None) self.assertEqual(obj3, 1) self.assertEqual(obj4, b'spam') self.assertEqual(obj5, b'eggs') self.assertIs(obj6, default) def test_recv_sending_interp_destroyed(self): with self.subTest('closed'): cid1 = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels _channels.send({cid1}, b'spam', blocking=False) """)) _interpreters.destroy(interp) with self.assertRaisesRegex(RuntimeError, f'channel {cid1} is closed'): _channels.recv(cid1) del cid1 with self.subTest('still open'): cid2 = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels _channels.send({cid2}, b'spam', blocking=False) """)) _channels.send(cid2, b'eggs', blocking=False) _interpreters.destroy(interp) recv_nowait(cid2, unbound=True) recv_nowait(cid2, unbound=False) with self.assertRaisesRegex(RuntimeError, f'channel {cid2} is empty'): _channels.recv(cid2) del cid2 #------------------- # send_buffer def test_send_buffer(self): buf = bytearray(b'spamspamspam') cid = _channels.create(REPLACE) _channels.send_buffer(cid, buf, blocking=False) obj = recv_nowait(cid) self.assertIsNot(obj, buf) self.assertIsInstance(obj, memoryview) self.assertEqual(obj, buf) buf[4:8] = b'eggs' self.assertEqual(obj, buf) obj[4:8] = b'ham.' self.assertEqual(obj, buf) #------------------- # send with waiting def build_send_waiter(self, obj, *, buffer=False): # We want a long enough sleep that send() actually has to wait. if buffer: send = _channels.send_buffer else: send = _channels.send cid = _channels.create(REPLACE) try: started = time.monotonic() send(cid, obj, blocking=False) stopped = time.monotonic() recv_nowait(cid) finally: _channels.destroy(cid) delay = stopped - started # seconds delay *= 3 def wait(): time.sleep(delay) return wait def test_send_blocking_waiting(self): received = None obj = b'spam' wait = self.build_send_waiter(obj) cid = _channels.create(REPLACE) def f(): nonlocal received wait() received = recv_wait(cid) t = threading.Thread(target=f) t.start() _channels.send(cid, obj, blocking=True) t.join() self.assertEqual(received, obj) def test_send_buffer_blocking_waiting(self): received = None obj = bytearray(b'spam') wait = self.build_send_waiter(obj, buffer=True) cid = _channels.create(REPLACE) def f(): nonlocal received wait() received = recv_wait(cid) t = threading.Thread(target=f) t.start() _channels.send_buffer(cid, obj, blocking=True) t.join() self.assertEqual(received, obj) def test_send_blocking_no_wait(self): received = None obj = b'spam' cid = _channels.create(REPLACE) def f(): nonlocal received received = recv_wait(cid) t = threading.Thread(target=f) t.start() _channels.send(cid, obj, blocking=True) t.join() self.assertEqual(received, obj) def test_send_buffer_blocking_no_wait(self): received = None obj = bytearray(b'spam') cid = _channels.create(REPLACE) def f(): nonlocal received received = recv_wait(cid) t = threading.Thread(target=f) t.start() _channels.send_buffer(cid, obj, blocking=True) t.join() self.assertEqual(received, obj) def test_send_timeout(self): obj = b'spam' with self.subTest('non-blocking with timeout'): cid = _channels.create(REPLACE) with self.assertRaises(ValueError): _channels.send(cid, obj, blocking=False, timeout=0.1) with self.subTest('timeout hit'): cid = _channels.create(REPLACE) with self.assertRaises(TimeoutError): _channels.send(cid, obj, blocking=True, timeout=0.1) with self.assertRaises(_channels.ChannelEmptyError): received = recv_nowait(cid) print(repr(received)) with self.subTest('timeout not hit'): cid = _channels.create(REPLACE) def f(): recv_wait(cid) t = threading.Thread(target=f) t.start() _channels.send(cid, obj, blocking=True, timeout=10) t.join() def test_send_buffer_timeout(self): try: self._has_run_once_timeout except AttributeError: # At the moment, this test leaks a few references. # It looks like the leak originates with the addition # of _channels.send_buffer() (gh-110246), whereas the # tests were added afterward. We want this test even # if the refleak isn't fixed yet, so we skip here. raise unittest.SkipTest('temporarily skipped due to refleaks') else: self._has_run_once_timeout = True obj = bytearray(b'spam') with self.subTest('non-blocking with timeout'): cid = _channels.create(REPLACE) with self.assertRaises(ValueError): _channels.send_buffer(cid, obj, blocking=False, timeout=0.1) with self.subTest('timeout hit'): cid = _channels.create(REPLACE) with self.assertRaises(TimeoutError): _channels.send_buffer(cid, obj, blocking=True, timeout=0.1) with self.assertRaises(_channels.ChannelEmptyError): received = recv_nowait(cid) print(repr(received)) with self.subTest('timeout not hit'): cid = _channels.create(REPLACE) def f(): recv_wait(cid) t = threading.Thread(target=f) t.start() _channels.send_buffer(cid, obj, blocking=True, timeout=10) t.join() def test_send_closed_while_waiting(self): obj = b'spam' wait = self.build_send_waiter(obj) with self.subTest('without timeout'): cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) t = threading.Thread(target=f) t.start() with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, obj, blocking=True) t.join() with self.subTest('with timeout'): cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) t = threading.Thread(target=f) t.start() with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, obj, blocking=True, timeout=30) t.join() def test_send_buffer_closed_while_waiting(self): try: self._has_run_once_closed except AttributeError: # At the moment, this test leaks a few references. # It looks like the leak originates with the addition # of _channels.send_buffer() (gh-110246), whereas the # tests were added afterward. We want this test even # if the refleak isn't fixed yet, so we skip here. raise unittest.SkipTest('temporarily skipped due to refleaks') else: self._has_run_once_closed = True obj = bytearray(b'spam') wait = self.build_send_waiter(obj, buffer=True) with self.subTest('without timeout'): cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) t = threading.Thread(target=f) t.start() with self.assertRaises(_channels.ChannelClosedError): _channels.send_buffer(cid, obj, blocking=True) t.join() with self.subTest('with timeout'): cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) t = threading.Thread(target=f) t.start() with self.assertRaises(_channels.ChannelClosedError): _channels.send_buffer(cid, obj, blocking=True, timeout=30) t.join() #------------------- # close def test_close_single_user(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.close(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_multiple_users(self): cid = _channels.create(REPLACE) id1 = _interpreters.create() id2 = _interpreters.create() _interpreters.run_string(id1, dedent(f""" import _interpchannels as _channels _channels.send({cid}, b'spam', blocking=False) """)) _interpreters.run_string(id2, dedent(f""" import _interpchannels as _channels _channels.recv({cid}) """)) _channels.close(cid) excsnap = _interpreters.run_string(id1, dedent(f""" _channels.send({cid}, b'spam') """)) self.assertEqual(excsnap.type.__name__, 'ChannelClosedError') excsnap = _interpreters.run_string(id2, dedent(f""" _channels.send({cid}, b'spam') """)) self.assertEqual(excsnap.type.__name__, 'ChannelClosedError') def test_close_multiple_times(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.close(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.close(cid) def test_close_empty(self): tests = [ (False, False), (True, False), (False, True), (True, True), ] for send, recv in tests: with self.subTest((send, recv)): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.close(cid, send=send, recv=recv) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_defaults_with_unused_items(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) with self.assertRaises(_channels.ChannelNotEmptyError): _channels.close(cid) recv_nowait(cid) _channels.send(cid, b'eggs', blocking=False) def test_close_recv_with_unused_items_unforced(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) with self.assertRaises(_channels.ChannelNotEmptyError): _channels.close(cid, recv=True) recv_nowait(cid) _channels.send(cid, b'eggs', blocking=False) recv_nowait(cid) recv_nowait(cid) _channels.close(cid, recv=True) def test_close_send_with_unused_items_unforced(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, send=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') recv_nowait(cid) recv_nowait(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_both_with_unused_items_unforced(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) with self.assertRaises(_channels.ChannelNotEmptyError): _channels.close(cid, recv=True, send=True) recv_nowait(cid) _channels.send(cid, b'eggs', blocking=False) recv_nowait(cid) recv_nowait(cid) _channels.close(cid, recv=True) def test_close_recv_with_unused_items_forced(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, recv=True, force=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_send_with_unused_items_forced(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, send=True, force=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_both_with_unused_items_forced(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, send=True, recv=True, force=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_never_used(self): cid = _channels.create(REPLACE) _channels.close(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'spam') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_by_unassociated_interp(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels _channels.close({cid}, force=True) """)) with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.close(cid) def test_close_used_multiple_times_by_single_user(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.close(cid, force=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_channel_list_interpreters_invalid_channel(self): cid = _channels.create(REPLACE) # Test for invalid channel ID. with self.assertRaises(_channels.ChannelNotFoundError): _channels.list_interpreters(1000, send=True) _channels.close(cid) # Test for a channel that has been closed. with self.assertRaises(_channels.ChannelClosedError): _channels.list_interpreters(cid, send=True) def test_channel_list_interpreters_invalid_args(self): # Tests for invalid arguments passed to the API. cid = _channels.create(REPLACE) with self.assertRaises(TypeError): _channels.list_interpreters(cid) class ChannelReleaseTests(TestBase): # XXX Add more test coverage a la the tests for close(). """ - main / interp / other - run in: current thread / new thread / other thread / different threads - end / opposite - force / no force - used / not used (associated / not associated) - empty / emptied / never emptied / partly emptied - closed / not closed - released / not released - creator (interp) / other - associated interpreter not running - associated interpreter destroyed """ """ use pre-release release after check """ """ release in: main, interp1 creator: same, other (incl. interp2) use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all pre-release: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all release: same release forced: same use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all release after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all check released: send/recv for same/other(incl. interp2) check closed: send/recv for same/other(incl. interp2) """ def test_single_user(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.release(cid, send=True, recv=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_multiple_users(self): cid = _channels.create(REPLACE) id1 = _interpreters.create() id2 = _interpreters.create() _interpreters.run_string(id1, dedent(f""" import _interpchannels as _channels _channels.send({cid}, b'spam', blocking=False) """)) out = _run_output(id2, dedent(f""" import _interpchannels as _channels obj, _ = _channels.recv({cid}) _channels.release({cid}) print(repr(obj)) """)) _interpreters.run_string(id1, dedent(f""" _channels.release({cid}) """)) self.assertEqual(out.strip(), "b'spam'") def test_no_kwargs(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.release(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_multiple_times(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.release(cid, send=True, recv=True) with self.assertRaises(_channels.ChannelClosedError): _channels.release(cid, send=True, recv=True) def test_with_unused_items(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.release(cid, send=True, recv=True) with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_never_used(self): cid = _channels.create(REPLACE) _channels.release(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'spam') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_by_unassociated_interp(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels _channels.release({cid}) """)) obj = recv_nowait(cid) _channels.release(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') self.assertEqual(obj, b'spam') def test_close_if_unassociated(self): # XXX Something's not right with this test... cid = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels obj = _channels.send({cid}, b'spam', blocking=False) _channels.release({cid}) """)) with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_partially(self): # XXX Is partial close too weird/confusing? cid = _channels.create(REPLACE) _channels.send(cid, None, blocking=False) recv_nowait(cid) _channels.send(cid, b'spam', blocking=False) _channels.release(cid, send=True) obj = recv_nowait(cid) self.assertEqual(obj, b'spam') def test_used_multiple_times_by_single_user(self): cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) recv_nowait(cid) _channels.release(cid, send=True, recv=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) class ChannelCloseFixture(namedtuple('ChannelCloseFixture', 'end interp other extra creator')): # Set this to True to avoid creating interpreters, e.g. when # scanning through test permutations without running them. QUICK = False def __new__(cls, end, interp, other, extra, creator): assert end in ('send', 'recv') if cls.QUICK: known = {} else: interp = Interpreter.from_raw(interp) other = Interpreter.from_raw(other) extra = Interpreter.from_raw(extra) known = { interp.name: interp, other.name: other, extra.name: extra, } if not creator: creator = 'same' self = super().__new__(cls, end, interp, other, extra, creator) self._prepped = set() self._state = ChannelState() self._known = known return self @property def state(self): return self._state @property def cid(self): try: return self._cid except AttributeError: creator = self._get_interpreter(self.creator) self._cid = self._new_channel(creator) return self._cid def get_interpreter(self, interp): interp = self._get_interpreter(interp) self._prep_interpreter(interp) return interp def expect_closed_error(self, end=None): if end is None: end = self.end if end == 'recv' and self.state.closed == 'send': return False return bool(self.state.closed) def prep_interpreter(self, interp): self._prep_interpreter(interp) def record_action(self, action, result): self._state = result def clean_up(self): clean_up_interpreters() clean_up_channels() # internal methods def _new_channel(self, creator): if creator.name == 'main': return _channels.create(REPLACE) else: ch = _channels.create(REPLACE) run_interp(creator.id, f""" import _interpreters cid = _xxsubchannels.create() # We purposefully send back an int to avoid tying the # channel to the other interpreter. _xxsubchannels.send({ch}, int(cid), blocking=False) del _interpreters """) self._cid = recv_nowait(ch) return self._cid def _get_interpreter(self, interp): if interp in ('same', 'interp'): return self.interp elif interp == 'other': return self.other elif interp == 'extra': return self.extra else: name = interp try: interp = self._known[name] except KeyError: interp = self._known[name] = Interpreter(name) return interp def _prep_interpreter(self, interp): if interp.id in self._prepped: return self._prepped.add(interp.id) if interp.name == 'main': return run_interp(interp.id, f""" import _interpchannels as channels import test.test__interpchannels as helpers ChannelState = helpers.ChannelState try: cid except NameError: cid = _channels._channel_id({self.cid}) """) @unittest.skip('these tests take several hours to run') class ExhaustiveChannelTests(TestBase): """ - main / interp / other - run in: current thread / new thread / other thread / different threads - end / opposite - force / no force - used / not used (associated / not associated) - empty / emptied / never emptied / partly emptied - closed / not closed - released / not released - creator (interp) / other - associated interpreter not running - associated interpreter destroyed - close after unbound """ """ use pre-close close after check """ """ close in: main, interp1 creator: same, other, extra use: None,send,recv,send/recv in None,same,other,same+other,all pre-close: None,send,recv in None,same,other,same+other,all pre-close forced: None,send,recv in None,same,other,same+other,all close: same close forced: same use after: None,send,recv,send/recv in None,same,other,extra,same+other,all close after: None,send,recv,send/recv in None,same,other,extra,same+other,all check closed: send/recv for same/other(incl. interp2) """ def iter_action_sets(self): # - used / not used (associated / not associated) # - empty / emptied / never emptied / partly emptied # - closed / not closed # - released / not released # never used yield [] # only pre-closed (and possible used after) for closeactions in self._iter_close_action_sets('same', 'other'): yield closeactions for postactions in self._iter_post_close_action_sets(): yield closeactions + postactions for closeactions in self._iter_close_action_sets('other', 'extra'): yield closeactions for postactions in self._iter_post_close_action_sets(): yield closeactions + postactions # used for useactions in self._iter_use_action_sets('same', 'other'): yield useactions for closeactions in self._iter_close_action_sets('same', 'other'): actions = useactions + closeactions yield actions for postactions in self._iter_post_close_action_sets(): yield actions + postactions for closeactions in self._iter_close_action_sets('other', 'extra'): actions = useactions + closeactions yield actions for postactions in self._iter_post_close_action_sets(): yield actions + postactions for useactions in self._iter_use_action_sets('other', 'extra'): yield useactions for closeactions in self._iter_close_action_sets('same', 'other'): actions = useactions + closeactions yield actions for postactions in self._iter_post_close_action_sets(): yield actions + postactions for closeactions in self._iter_close_action_sets('other', 'extra'): actions = useactions + closeactions yield actions for postactions in self._iter_post_close_action_sets(): yield actions + postactions def _iter_use_action_sets(self, interp1, interp2): interps = (interp1, interp2) # only recv end used yield [ ChannelAction('use', 'recv', interp1), ] yield [ ChannelAction('use', 'recv', interp2), ] yield [ ChannelAction('use', 'recv', interp1), ChannelAction('use', 'recv', interp2), ] # never emptied yield [ ChannelAction('use', 'send', interp1), ] yield [ ChannelAction('use', 'send', interp2), ] yield [ ChannelAction('use', 'send', interp1), ChannelAction('use', 'send', interp2), ] # partially emptied for interp1 in interps: for interp2 in interps: for interp3 in interps: yield [ ChannelAction('use', 'send', interp1), ChannelAction('use', 'send', interp2), ChannelAction('use', 'recv', interp3), ] # fully emptied for interp1 in interps: for interp2 in interps: for interp3 in interps: for interp4 in interps: yield [ ChannelAction('use', 'send', interp1), ChannelAction('use', 'send', interp2), ChannelAction('use', 'recv', interp3), ChannelAction('use', 'recv', interp4), ] def _iter_close_action_sets(self, interp1, interp2): ends = ('recv', 'send') interps = (interp1, interp2) for force in (True, False): op = 'force-close' if force else 'close' for interp in interps: for end in ends: yield [ ChannelAction(op, end, interp), ] for recvop in ('close', 'force-close'): for sendop in ('close', 'force-close'): for recv in interps: for send in interps: yield [ ChannelAction(recvop, 'recv', recv), ChannelAction(sendop, 'send', send), ] def _iter_post_close_action_sets(self): for interp in ('same', 'extra', 'other'): yield [ ChannelAction('use', 'recv', interp), ] yield [ ChannelAction('use', 'send', interp), ] def run_actions(self, fix, actions): for action in actions: self.run_action(fix, action) def run_action(self, fix, action, *, hideclosed=True): end = action.resolve_end(fix.end) interp = action.resolve_interp(fix.interp, fix.other, fix.extra) fix.prep_interpreter(interp) if interp.name == 'main': result = run_action( fix.cid, action.action, end, fix.state, hideclosed=hideclosed, ) fix.record_action(action, result) else: _cid = _channels.create(REPLACE) run_interp(interp.id, f""" result = helpers.run_action( {fix.cid}, {repr(action.action)}, {repr(end)}, {repr(fix.state)}, hideclosed={hideclosed}, ) _channels.send({_cid}, result.pending.to_bytes(1, 'little'), blocking=False) _channels.send({_cid}, b'X' if result.closed else b'', blocking=False) """) result = ChannelState( pending=int.from_bytes(recv_nowait(_cid), 'little'), closed=bool(recv_nowait(_cid)), ) fix.record_action(action, result) def iter_fixtures(self): # XXX threads? interpreters = [ ('main', 'interp', 'extra'), ('interp', 'main', 'extra'), ('interp1', 'interp2', 'extra'), ('interp1', 'interp2', 'main'), ] for interp, other, extra in interpreters: for creator in ('same', 'other', 'creator'): for end in ('send', 'recv'): yield ChannelCloseFixture(end, interp, other, extra, creator) def _close(self, fix, *, force): op = 'force-close' if force else 'close' close = ChannelAction(op, fix.end, 'same') if not fix.expect_closed_error(): self.run_action(fix, close, hideclosed=False) else: with self.assertRaises(_channels.ChannelClosedError): self.run_action(fix, close, hideclosed=False) def _assert_closed_in_interp(self, fix, interp=None): if interp is None or interp.name == 'main': with self.assertRaises(_channels.ChannelClosedError): _channels.recv(fix.cid) with self.assertRaises(_channels.ChannelClosedError): _channels.send(fix.cid, b'spam') with self.assertRaises(_channels.ChannelClosedError): _channels.close(fix.cid) with self.assertRaises(_channels.ChannelClosedError): _channels.close(fix.cid, force=True) else: run_interp(interp.id, """ with helpers.expect_channel_closed(): _channels.recv(cid) """) run_interp(interp.id, """ with helpers.expect_channel_closed(): _channels.send(cid, b'spam', blocking=False) """) run_interp(interp.id, """ with helpers.expect_channel_closed(): _channels.close(cid) """) run_interp(interp.id, """ with helpers.expect_channel_closed(): _channels.close(cid, force=True) """) def _assert_closed(self, fix): self.assertTrue(fix.state.closed) for _ in range(fix.state.pending): recv_nowait(fix.cid) self._assert_closed_in_interp(fix) for interp in ('same', 'other'): interp = fix.get_interpreter(interp) if interp.name == 'main': continue self._assert_closed_in_interp(fix, interp) interp = fix.get_interpreter('fresh') self._assert_closed_in_interp(fix, interp) def _iter_close_tests(self, verbose=False): i = 0 for actions in self.iter_action_sets(): print() for fix in self.iter_fixtures(): i += 1 if i > 1000: return if verbose: if (i - 1) % 6 == 0: print() print(i, fix, '({} actions)'.format(len(actions))) else: if (i - 1) % 6 == 0: print(' ', end='') print('.', end=''); sys.stdout.flush() yield i, fix, actions if verbose: print('---') print() # This is useful for scanning through the possible tests. def _skim_close_tests(self): ChannelCloseFixture.QUICK = True for i, fix, actions in self._iter_close_tests(): pass def test_close(self): for i, fix, actions in self._iter_close_tests(): with self.subTest('{} {} {}'.format(i, fix, actions)): fix.prep_interpreter(fix.interp) self.run_actions(fix, actions) self._close(fix, force=False) self._assert_closed(fix) # XXX Things slow down if we have too many interpreters. fix.clean_up() def test_force_close(self): for i, fix, actions in self._iter_close_tests(): with self.subTest('{} {} {}'.format(i, fix, actions)): fix.prep_interpreter(fix.interp) self.run_actions(fix, actions) self._close(fix, force=True) self._assert_closed(fix) # XXX Things slow down if we have too many interpreters. fix.clean_up() if __name__ == '__main__': unittest.main()